#!/usr/bin/env python
# encoding: utf-8
"""
*find and fit the continuum trace across all echelle orders with low-order polynomials.*
Author
: David Young & Marco Landoni
Date Created
: September 10, 2020
"""
################# GLOBAL IMPORTS ####################
from soxspipe.commonutils.toolkit import read_spectral_format
from soxspipe.commonutils.toolkit import cut_image_slice
from soxspipe.commonutils.toolkit import get_calibration_lamp
from soxspipe.commonutils.dispersion_map_to_pixel_arrays import (
dispersion_map_to_pixel_arrays,
)
from soxspipe.commonutils.filenamer import filenamer
from soxspipe.commonutils.polynomials import (
chebyshev_xy_polynomial,
chebyshev_order_xy_polynomials,
)
from soxspipe.commonutils import detector_lookup
from soxspipe.commonutils import keyword_lookup
import collections
from random import random
from os.path import expanduser
from fundamentals import tools
from builtins import object
import sys
import math
import os
from io import StringIO
import copy
from contextlib import suppress
from datetime import datetime
from line_profiler import profile
os.environ["TERM"] = "vt100"
class _base_detect(object):
def fit_order_polynomial(self, pixelList, order, axisBDeg, axisACol, axisBCol, exponentsIncluded=False):
"""*iteratively fit the dispersion map polynomials to the data, clipping residuals with each iteration*
**Key Arguments:**
- ``pixelList`` -- data-frame group containing x,y pixel array
- ``order`` -- the order to fit
- ``axisBDeg`` -- degree for polynomial to fit
- ``axisACol`` -- name of columns containing axis to be fitted
- ``axisBCol`` -- name of columns containing free axis (values known)
- ``exponentsIncluded`` -- the exponents have already been calculated in the dataframe so no need to regenerate. Default *False*
**Return:**
- ``coeffs`` -- the coefficients of the polynomial fit
- ``pixelList`` -- the pixel list but now with fits and residuals included
"""
self.log.debug("starting the ``fit_order_polynomial`` method")
import numpy as np
from astropy.stats import sigma_clip
from scipy.optimize import curve_fit
arm = self.arm
self.axisBDeg = axisBDeg
clippedCount = 1
poly = chebyshev_xy_polynomial(
log=self.log,
axisBCol=axisBCol,
axisBDeg=self.axisBDeg,
exponentsIncluded=exponentsIncluded,
).poly
clippingSigma = self.recipeSettings["poly-fitting-residual-clipping-sigma"]
clippingIterationLimit = self.recipeSettings["poly-clipping-iteration-limit"]
iteration = 0
mask = pixelList["order"] == order
pixelListFiltered = pixelList.loc[mask]
coeff = np.ones((self.axisBDeg + 1))
while clippedCount > 0 and iteration < clippingIterationLimit:
pixelListFiltered = pixelList.loc[mask]
startCount = len(pixelListFiltered.index)
iteration += 1
# USE LEAST-SQUARED CURVE FIT TO FIT CHEBY POLY
# NOTE X AND Y COLUMN ARE CORRECLY IN xdata AND ydata - WANT TO
# FIND X (UNKNOWN) WRT Y (KNOWNN)
try:
coeff, pcov_x = curve_fit(
poly,
xdata=pixelListFiltered,
ydata=pixelListFiltered[axisACol].values,
p0=coeff,
maxfev=30000,
)
except TypeError as e:
# REMOVE THIS ORDER FROM PIXEL LIST
pixelList = pixelList.loc[~mask]
coeff = None
return coeff, pixelList
except Exception as e:
raise e
res, res_mean, res_std, res_median, xfit, poly = self.calculate_residuals(
orderPixelTable=pixelListFiltered,
coeff=coeff,
axisACol=axisACol,
axisBCol=axisBCol,
)
pixelList.loc[mask, "x_fit_res"] = res
pixelList.loc[mask, "x_fit"] = xfit
# SIGMA-CLIP THE DATA
masked_residuals = sigma_clip(
res,
sigma_lower=clippingSigma,
sigma_upper=clippingSigma,
maxiters=1,
cenfunc="median",
stdfunc="mad_std",
)
pixelList.loc[mask, "mask"] = masked_residuals.mask
# REMOVE FILTERED ROWS FROM DATA FRAME
removeMask = pixelList["mask"] == True
pixelList = pixelList.loc[~removeMask]
pixelListFiltered = pixelList.loc[mask]
clippedCount = startCount - len(pixelListFiltered.index)
sys.stdout.flush()
sys.stdout.write("\x1b[1A\x1b[2K")
self.log.print(
f"\t\tORDER {order:0.0f}: {clippedCount} pixel positions where clipped in iteration {iteration} of fitting the polynomial"
)
self.log.debug("completed the ``fit_order_polynomial`` method")
return coeff, pixelList
def fit_global_polynomial(
self,
pixelList,
axisACol="cont_x",
axisBCol="cont_y",
orderCol="order",
exponentsIncluded=False,
writeQCs=False,
):
"""*iteratively fit the global polynomial to the data, fitting axisA as a function of axisB, clipping residuals with each iteration*
**Key Arguments:**
- ``pixelList`` -- data-frame group containing x,y pixel array
- ``exponentsIncluded`` -- the exponents have already been calculated in the dataframe so no need to regenerate. Default *False*
**Return:**
- ``coeffs`` -- the coefficients of the polynomial fit
- ``pixelList`` -- the pixel list but now with fits and residuals included
- ``allClipped`` -- data that was sigma-clipped
"""
self.log.debug("starting the ``fit_global_polynomial`` method")
import numpy as np
from astropy.stats import sigma_clip
from scipy.optimize import curve_fit
import pandas as pd
arm = self.arm
clippedCount = 1
poly = chebyshev_order_xy_polynomials(
log=self.log,
axisBCol=axisBCol,
orderCol=orderCol,
orderDeg=self.orderDeg,
axisBDeg=self.axisBDeg,
axisB=self.axisB,
exponentsIncluded=exponentsIncluded,
).poly
clippingSigmaHigh = self.recipeSettings["poly-fitting-residual-clipping-sigma"]
clippingIterationLimit = self.recipeSettings["poly-clipping-iteration-limit"]
if axisACol == "stddev":
clippingSigmaLow = clippingSigmaHigh
clippingSigmaHigh = clippingSigmaLow * 2
else:
clippingSigmaLow = clippingSigmaHigh
iteration = 0
allClipped = []
if "cont_x" in pixelList.columns:
mask = pixelList["cont_x"].isna() | pixelList["cont_y"].isna()
pixelList.loc[mask, "pre-clipped"] = True
# REMOVE FILTERED ROWS FROM DATA FRAME
if "pre-clipped" in pixelList.columns:
removeMask = pixelList["pre-clipped"] == True
allClipped.append(pixelList.loc[removeMask])
pixelList = pixelList.loc[~removeMask]
coeff = np.ones((self.axisBDeg + 1) * (self.orderDeg + 1))
while clippedCount > 0 and iteration < clippingIterationLimit:
startCount = len(pixelList.index)
iteration += 1
# USE LEAST-SQUARED CURVE FIT TO FIT CHEBY POLY
if len(pixelList.index) == 0:
# REMOVE THIS ORDER FROM PIXEL LIST
coeff = None
return coeff, pixelList, pixelList
if iteration < 3:
coeff = np.ones((self.axisBDeg + 1) * (self.orderDeg + 1))
try:
coeff, pcov_x = curve_fit(
poly,
xdata=pixelList,
ydata=pixelList[axisACol].values,
p0=coeff,
maxfev=30000,
)
except TypeError as e:
# REMOVE THIS ORDER FROM PIXEL LIST
coeff = None
return coeff, pixelList, pixelList
except Exception as e:
raise e
res, res_mean, res_std, res_median, xfit = self.calculate_residuals(
orderPixelTable=pixelList,
coeff=coeff,
orderCol=orderCol,
axisACol=axisACol,
axisBCol=axisBCol,
)
pixelList[f"{axisACol}_fit_res"] = res
pixelList[f"{axisACol}_fit"] = xfit
# SIGMA-CLIP THE DATA
masked_residuals = sigma_clip(
res,
sigma_lower=clippingSigmaLow,
sigma_upper=clippingSigmaHigh,
maxiters=2,
cenfunc="mean",
stdfunc="std",
)
pixelList["mask"] = masked_residuals.mask
# REMOVE FILTERED ROWS FROM DATA FRAME
removeMask = pixelList["mask"] == True
allClipped.append(pixelList.loc[removeMask])
pixelList = pixelList.loc[~removeMask]
clippedCount = startCount - len(pixelList.index)
if iteration > 1:
sys.stdout.flush()
sys.stdout.write("\x1b[1A\x1b[2K")
self.log.print(
f"\t\tGLOBAL FIT: {clippedCount} pixel positions where clipped in iteration {iteration} of fitting the polynomial"
)
if len(allClipped):
allClipped = pd.concat(allClipped, ignore_index=True)
res, res_mean, res_std, res_median, xfit = self.calculate_residuals(
orderPixelTable=pixelList,
coeff=coeff,
orderCol=orderCol,
axisACol=axisACol,
axisBCol=axisBCol,
writeQCs=writeQCs,
)
self.log.debug("completed the ``fit_global_polynomial`` method")
return coeff, pixelList, allClipped
def calculate_residuals(self, orderPixelTable, coeff, axisACol, axisBCol, orderCol=False, writeQCs=False):
"""*calculate residuals of the polynomial fits against the observed line postions*
**Key Arguments:**
- ``orderPixelTable`` -- data-frame containing pixel list for given order
- ``coeff`` -- the coefficients of the fitted polynomial
- ``axisACol`` -- name of x-pixel column
- ``axisBCol`` -- name of y-pixel column
- ``orderCol`` -- name of the order column (global fits only)
- ``writeQCs`` -- write the QCs to dataframe? Default *False*
**Return:**
- ``res`` -- x residuals
- ``mean`` -- the mean of the residuals
- ``std`` -- the stdev of the residuals
- ``median`` -- the median of the residuals
- ``xfit`` -- fitted x values
"""
self.log.debug("starting the ``calculate_residuals`` method")
import numpy as np
import pandas as pd
arm = self.arm
poly = chebyshev_order_xy_polynomials(
log=self.log,
axisBCol=axisBCol,
orderCol=orderCol,
orderDeg=self.orderDeg,
axisBDeg=self.axisBDeg,
).poly
# CALCULATE RESIDUALS BETWEEN GAUSSIAN PEAK LINE POSITIONS AND POLY
# FITTED POSITIONS
thisFit = poly(orderPixelTable, *coeff)
res = thisFit - orderPixelTable[axisACol].values
# GET UNIQUE VALUES IN COLUMN
uniqueorders = len(orderPixelTable["order"].unique())
# CALCULATE COMBINED RESIDUALS AND STATS
res_mean = np.ma.mean(np.ma.abs(res))
res_std = np.ma.std(res)
res_median = np.ma.median(np.ma.abs(res))
if writeQCs:
utcnow = datetime.utcnow()
utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S")
tag = "continuum"
if "order-centres" in self.recipeName.lower():
tag = "order centres"
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": f"{self.axisA.upper()} RES MIN",
"qc_value": f"{res.min():0.2f}",
"qc_comment": f"[px] Minimum residual in {tag} fit along {self.axisA}-axis",
"qc_unit": "px",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": f"{self.axisA.upper()} RES MAX",
"qc_value": f"{res.max():0.2f}",
"qc_comment": f"[px] Maximum residual in {tag} fit along {self.axisA}-axis",
"qc_unit": "px",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": f"{self.axisA.upper()} RES SD",
"qc_value": f"{res_std:0.2f}",
"qc_comment": f"[px] Std-dev of residual {tag} fit along {self.axisA}-axis",
"qc_unit": "px",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": f"{self.axisA.upper()} RES MEDIAN",
"qc_value": f"{res_mean:0.2f}",
"qc_comment": f"[px] Median abolute residual {tag} fit along {self.axisA}-axis",
"qc_unit": "px",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
if "order" in self.recipeName.lower():
c = f"Number of order centre traces found"
else:
c = f"Number of orders containing an object trace"
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": "N ORDERS",
"qc_value": uniqueorders,
"qc_comment": c,
"qc_unit": None,
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
self.log.debug("completed the ``calculate_residuals`` method")
return res, res_mean, res_std, res_median, thisFit
def write_order_table_to_file(self, frame, orderPolyTable, orderMetaTable):
"""*write out the fitted polynomial solution coefficients to file*
**Key Arguments:**
- ``frame`` -- the calibration frame used to generate order location data
- ``orderPolyTable`` -- data-frames containing centre location coefficients (and possibly also order edge coeffs)
- ``orderMetaTable`` -- extra order meta data to be added in an extra FITS extension
**Return:**
- ``order_table_path`` -- path to the order table file
"""
from astropy.table import Table
from astropy.io import fits
self.log.debug("starting the ``write_order_table_to_file`` method")
arm = self.arm
kw = self.kw
if not self.sofName:
filename = filenamer(log=self.log, frame=frame, settings=self.settings)
else:
filename = self.sofName + ".fits"
filename = filename.replace("MFLAT", "FLAT")
# if self.inst.upper() == "SOXS":
# filename = filename.replace("_DLAMP", "")
# filename = filename.replace("_QLAMP", "")
if "mflat" in self.recipeName.lower():
filename = filename.upper().replace("FLAT", "OLOC").replace(".FITS", ".fits")
elif "stare" in self.recipeName.lower():
filename = filename.upper().split(".FITS")[0] + "_OBJTRACE.fits"
elif "nod" in self.recipeName.lower():
# sequence = "A" if int(frame.header['HIERARCH ESO SEQ CUMOFF Y'] > 0) else "B"
filename = filename.upper().split(".FITS")[0] + "_OBJTRACE" + self.noddingSequence + ".fits"
if self.lampTag and self.inst.upper() != "SOXS":
filename = filename.replace(".fits", f"{self.lampTag}.fits")
order_table_path = f"{self.productDir}/{filename}"
header = copy.deepcopy(frame.header)
with suppress(KeyError):
header.pop(kw("DPR_TECH"))
with suppress(KeyError):
header.pop(kw("DPR_CATG"))
with suppress(KeyError):
header.pop(kw("DPR_TYPE"))
with suppress(KeyError):
header.pop(kw("DET_READ_SPEED"))
with suppress(KeyError):
header.pop(kw("CONAD"))
with suppress(KeyError):
header.pop(kw("GAIN"))
with suppress(KeyError):
header.pop(kw("RON"))
header["HIERARCH " + kw("PRO_TECH")] = "ECHELLE,SLIT"
orderPolyTable = Table.from_pandas(orderPolyTable)
BinTableHDU = fits.table_to_hdu(orderPolyTable)
orderMetaTable = Table.from_pandas(orderMetaTable)
BinTableHDU2 = fits.table_to_hdu(orderMetaTable)
header[kw("SEQ_ARM")] = arm
header["HIERARCH " + kw("PRO_TYPE")] = "REDUCED"
if (
"stare" not in self.recipeName.lower()
and "nod" not in self.recipeName.lower()
and "offset" not in self.recipeName.lower()
):
header["HIERARCH " + kw("PRO_CATG")] = f"ORDER_TAB_{arm}".upper()
else:
header["HIERARCH " + kw("PRO_CATG")] = f"OBJECT_TAB_{arm}".upper()
priHDU = fits.PrimaryHDU(header=header)
hduList = fits.HDUList([priHDU, BinTableHDU, BinTableHDU2])
hduList.verify("fix")
hduList.writeto(order_table_path, checksum=True, overwrite=True)
self.log.debug("completed the ``write_order_table_to_file`` method")
return order_table_path
[docs]
class detect_continuum(_base_detect):
"""
*find and fit the continuum trace across all echelle orders with low-order polynomials.*
**Key Arguments:**
- ``log`` -- logger
- ``traceFrame`` -- calibrated frame containing a source trace (CCDObject)
- ``dispersion_map`` -- path to dispersion map file containing polynomial fits of the dispersion solution for the frame
- ``settings`` -- the settings dictionary
- ``recipeSettings`` -- the recipe specific settings
- ``recipeName`` -- the recipe name as given in the settings dictionary
- ``qcTable`` -- the data frame to collect measured QC metrics
- ``productsTable`` -- the data frame to collect output products
- ``sofName`` ---- name of the originating SOF file
- ``binx`` -- binning in x-axis
- ``biny`` -- binning in y-axis
- ``lampTag`` -- add this tag to the end of the product filename. Default *False*
- ``locationSetIndex`` -- the index of the AB cycle locations (nodding mode only). Default *False*
- ``orderPixelTable`` -- this is used for tuning the pipeline. Default *False*
- ``startNightDate`` -- YYYY-MM-DD date of the observation night. Default ""
- ``debug`` -- if *True* then extra debugging information is printed. Default *False*
**Usage:**
To use the ``detect_continuum`` object, use the following:
```python
from soxspipe.commonutils import detect_continuum
detector = detect_continuum(
log=log,
traceFrame=traceFrame,
dispersion_map=dispersion_map,
settings=settings,
recipeName="soxs-order-centres"
)
order_table_path = detector.get()
```
"""
def __init__(
self,
log,
traceFrame,
dispersion_map,
settings=False,
recipeSettings=False,
recipeName=False,
qcTable=False,
productsTable=False,
sofName=False,
binx=1,
biny=1,
lampTag=False,
locationSetIndex=False,
orderPixelTable=False,
startNightDate="",
debug=False,
):
self.log = log
log.debug("instantiating a new 'detect_continuum' object")
import copy
self.settings = settings
try:
self.noddingSequence = "_A" if int(traceFrame.header["HIERARCH ESO SEQ CUMOFF Y"] > 0) else "_B"
if locationSetIndex:
self.noddingSequence += str(locationSetIndex)
except:
self.noddingSequence = ""
self.recipeName = recipeName
self.traceFrame = traceFrame
self.dispersion_map = dispersion_map
self.qc = qcTable
self.products = productsTable
self.sofName = sofName
self.binx = binx
self.biny = biny
self.recipeSettings = copy.deepcopy(recipeSettings["detect-continuum"])
self.lampTag = lampTag
self.orderPixelTable = orderPixelTable
self.startNightDate = startNightDate
self.debug = debug
# KEYWORD LOOKUP OBJECT - LOOKUP KEYWORD FROM DICTIONARY IN RESOURCES
# FOLDER
self.kw = keyword_lookup(log=self.log, settings=self.settings).get
self.arm = traceFrame.header[self.kw("SEQ_ARM")]
self.dateObs = traceFrame.header[self.kw("DATE_OBS")]
self.inst = traceFrame.header[self.kw("INSTRUME")]
self.exptime = traceFrame.header[self.kw("EXPTIME")]
# if self.exptime < 59 and recipeName != "soxs-stare":
# raise Exception("too low")
# DETECTOR PARAMETERS LOOKUP OBJECT
self.detectorParams = detector_lookup(log=log, settings=settings).get(self.arm)
# SET IMAGE ORIENTATION
if self.detectorParams["dispersion-axis"] == "x":
sliceAxis = "x"
sliceAntiAxis = "y"
else:
sliceAxis = "y"
sliceAntiAxis = "x"
self.sliceAxis = sliceAxis
self.sliceAntiAxis = sliceAntiAxis
# DEG OF THE POLYNOMIALS TO FIT THE ORDER CENTRE LOCATIONS
self.axisBDeg = self.recipeSettings["disp-axis-deg"]
self.orderDeg = self.recipeSettings["order-deg"]
self.lamp = get_calibration_lamp(log=log, frame=traceFrame, kw=self.kw)
from soxspipe.commonutils.toolkit import utility_setup
self.qcDir, self.productDir = utility_setup(
log=self.log,
settings=settings,
recipeName=recipeName,
startNightDate=startNightDate,
)
# SET IMAGE ORIENTATION
if self.detectorParams["dispersion-axis"] == "x":
self.axisA = "x"
self.axisB = "y"
self.coeff_dict = {
"degorder_cent": self.orderDeg,
"degy_cent": self.axisBDeg,
}
else:
self.axisA = "y"
self.axisB = "x"
self.coeff_dict = {
"degorder_cent": self.orderDeg,
"degx_cent": self.axisBDeg,
}
return None
[docs]
def get(self):
"""
*return the order centre table filepath*
**Return:**
- ``order_table_path`` -- file path to the order centre table giving polynomial coeffs to each order fit
"""
self.log.debug("starting the ``get`` method")
import numpy as np
import pandas as pd
from datetime import datetime
arm = self.arm
coeff_dict = self.coeff_dict
if isinstance(self.orderPixelTable, bool):
orderPixelTable = self.sample_trace()
else:
orderPixelTable = self.orderPixelTable
foundLines = len(orderPixelTable.index)
self.log.print("\n\t## FINDING GLOBAL POLYNOMIAL SOLUTION FOR CONTINUUM TRACES\n")
# GET UNIQUE VALUES IN COLUMN
uniqueOrders = orderPixelTable["order"].unique()
orderLocations = {}
orderPixelTable[f"cont_{self.axisA}_fit"] = np.nan
orderPixelTable[f"cont_{self.axisA}_fit_res"] = np.nan
# ITERATIVELY FIT THE POLYNOMIAL SOLUTIONS TO THE DATA
fitFound = False
tryCount = 0
backupOrderPixelTable = orderPixelTable.copy()
while not fitFound and tryCount < 5:
# SETUP EXPONENTS AHEAD OF TIME - SAVES TIME ON POLY FITTING
for i in range(0, self.axisBDeg + 1):
orderPixelTable[f"{self.axisB}_pow_{i}"] = orderPixelTable[f"cont_{self.axisB}"].pow(i)
for i in range(0, self.orderDeg + 1):
orderPixelTable[f"order_pow_{i}"] = orderPixelTable["order"].pow(i)
try:
coeff, orderPixelTable, clippedDataCentre = self.fit_global_polynomial(
pixelList=orderPixelTable,
axisACol=f"cont_{self.axisA}",
axisBCol=f"cont_{self.axisB}",
exponentsIncluded=True,
writeQCs=True,
)
mean_res = np.mean(np.abs(orderPixelTable[f"cont_{self.axisA}_fit_res"].values))
if "order" in self.recipeName.lower() and mean_res > 2:
orderPixelTable = backupOrderPixelTable
raise AttributeError("Failed to continuum trace")
elif mean_res > 10 and not (self.inst == "SOXS" and self.arm == "VIS"):
# BAD FIT ... FORCE A FAIL
orderPixelTable = backupOrderPixelTable
raise AttributeError("Failed to continuum trace")
n_coeff = 0
for i in range(0, self.orderDeg + 1):
for j in range(0, self.axisBDeg + 1):
coeff_dict[f"cent_{i}{j}"] = coeff[n_coeff]
n_coeff += 1
# ITERATIVELY FIT THE POLYNOMIAL SOLUTIONS TO THE DATA
coeff, orderPixelTable, clippedData = self.fit_global_polynomial(
pixelList=orderPixelTable,
axisACol="gauss_stddev",
axisBCol=f"cont_{self.axisB}",
exponentsIncluded=True,
)
if len(clippedData) and len(clippedData.index):
clippedData = pd.concat([clippedDataCentre, clippedData], ignore_index=True)
else:
clippedData = clippedDataCentre
fitFound = True
except Exception as e:
degList = [self.axisBDeg, self.orderDeg]
degList[degList.index(max(degList))] -= 1
self.axisBDeg, self.orderDeg = degList
coeff_dict["degorder_cent"] = self.orderDeg
coeff_dict[f"deg{self.axisB}_cent"] = self.axisBDeg
self.log.print(
f"{self.axisB} and Order fitting orders reduced to {self.axisBDeg}, {self.orderDeg} to try and successfully fit the continuum."
)
self.recipeSettings["disp-axis-deg"] = self.axisBDeg
self.recipeSettings["order-deg"] = self.orderDeg
tryCount += 1
if tryCount == 5:
self.log.print(
f"Could not converge on a good fit to the continuum. Please check the quality of your data or adjust your fitting parameters."
)
return None, self.qc, self.products, None, None, None
if self.settings["tune-pipeline"]:
raise e
utcnow = datetime.utcnow()
utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S")
try:
nclip = len(clippedData.index)
except:
nclip = 0
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": "SAMPLES CLIP NUM",
"qc_value": nclip,
"qc_comment": "Number of continuum sample clipped during solution fitting",
"qc_unit": None,
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
pclip = nclip / foundLines if foundLines > 0 else 0
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": "SAMPLES CLIP FRAC",
"qc_value": f"{pclip:0.3f}",
"qc_comment": "Fraction of detected continuum samples clipped during solution fitting",
"qc_unit": None,
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
# orderLocations[o] = coeff
coeff_dict["degorder_std"] = self.orderDeg
coeff_dict[f"deg{self.axisB}_std"] = self.axisBDeg
n_coeff = 0
for i in range(0, self.orderDeg + 1):
for j in range(0, self.axisBDeg + 1):
coeff_dict[f"std_{i}{j}"] = coeff[n_coeff]
n_coeff += 1
coeffColumns = coeff_dict.keys()
orderPolyTable = pd.DataFrame([coeff_dict])
# HERE IS THE LINE LIST IF NEEDED FOR QC
orderPixelTable.drop(columns=["mask"], inplace=True)
# OPTIMISE SUPER: 10s 8 hits
plotPath, orderMetaTable = self.plot_results(
orderPixelTable=orderPixelTable,
orderPolyTable=orderPolyTable,
clippedData=clippedData,
)
utcnow = datetime.utcnow()
utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S")
basename = os.path.basename(plotPath)
if "order" in self.recipeName.lower():
label = "ORDER_CENTRES_RES"
product_desc = f"Residuals of the order centre polynomial fit"
else:
label = "OBJECT_TRACE_RES"
product_desc = f"Residuals of the object trace polynomial fit"
if not isinstance(self.products, bool):
self.products = pd.concat(
[
self.products,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"product_label": label + self.noddingSequence,
"file_name": basename,
"file_type": "PDF",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"product_desc": product_desc,
"file_path": plotPath,
"label": "QC",
}
)
.to_frame()
.T,
],
ignore_index=True,
)
# WRITE OUT THE FITS TO THE ORDER CENTRE TABLE
order_table_path = self.write_order_table_to_file(
frame=self.traceFrame,
orderPolyTable=orderPolyTable,
orderMetaTable=orderMetaTable,
)
else:
order_table_path = self.write_order_table_to_file(
frame=self.traceFrame,
orderPolyTable=orderPolyTable,
orderMetaTable=orderMetaTable,
)
# mean_res = np.mean(np.abs(orderPixelTable[f'cont_{self.axisA}_fit_res'].values))
# std_res = np.std(np.abs(orderPixelTable[f'cont_{self.axisA}_fit_res'].values))
self.log.debug("completed the ``get`` method")
return (
order_table_path,
self.qc,
self.products,
orderPolyTable,
orderPixelTable,
orderMetaTable,
)
[docs]
def create_pixel_arrays(self):
"""*create a pixel array for the approximate centre of each order*
**Return:**
- ``orderPixelTable`` -- a data-frame containing lines and associated pixel locations
- `dmBinx` -- the dispersion map binning in x
- `dmBiny` -- the dispersion map binning in y
"""
self.log.debug("starting the ``create_pixel_arrays`` method")
import numpy as np
import pandas as pd
from astropy.io import fits
# READ THE SPECTRAL FORMAT TABLE TO DETERMINE THE LIMITS OF THE TRACES
orderNums, waveLengthMin, waveLengthMax, amins, amaxs = read_spectral_format(
log=self.log,
settings=self.settings,
arm=self.arm,
dispersionMap=self.dispersion_map,
)
# READ ORDER SAMPLING RESOLUTION FROM SETTINGS
sampleCount = self.recipeSettings["order-sample-count"]
orderPixelRanges = []
for o, amin, amax in zip(orderNums, amins, amaxs):
arange = amax - amin
orderPixelRanges.append(arange)
smallestRange = min(orderPixelRanges)
samplePixelSep = int(smallestRange / sampleCount)
# CREATE THE WAVELENGTH/ORDER ARRAYS TO BE CONVERTED TO PIXELS
myDict = {
"order": np.asarray([]),
"wavelength": np.asarray([]),
"slit_position": np.asarray([]),
}
for o, wmin, wmax, pixelRange in zip(orderNums, waveLengthMin, waveLengthMax, orderPixelRanges):
orderSampleCount = int(pixelRange / samplePixelSep)
wrange = wmax - wmin
wlArray = np.arange(wmin, wmax, (wmax - wmin) / orderSampleCount)
myDict["wavelength"] = np.append(myDict["wavelength"], wlArray)
myDict["order"] = np.append(myDict["order"], np.ones(len(wlArray)) * o)
myDict["slit_position"] = np.append(myDict["slit_position"], np.zeros(len(wlArray)))
orderPixelTable = pd.DataFrame(myDict)
orderPixelTable = dispersion_map_to_pixel_arrays(
log=self.log,
dispersionMapPath=self.dispersion_map,
orderPixelTable=orderPixelTable,
)
# GRAB HEADER FROM DISPERSION MAP
with fits.open(self.dispersion_map, memmap=True) as hdul:
header = hdul[0].header
try:
dmBinx = header[self.kw("WIN_BINX")]
dmBiny = header[self.kw("WIN_BINY")]
except:
dmBinx = 1
dmBiny = 1
self.log.debug("completed the ``create_pixel_arrays`` method")
return orderPixelTable, dmBinx, dmBiny
[docs]
def fit_1d_gaussian_to_slices(self, orderPixelTable, sliceLength, medianStddev=False):
"""Optimized version of Gaussian fitting to slices"""
import numpy as np
from astropy.stats import mad_std
from astropy.modeling import models, fitting
from scipy.signal import find_peaks
if not medianStddev:
medianStddev = 3.0
# Pre-allocate arrays
fitx = orderPixelTable["fit_x"].values
fity = orderPixelTable["fit_y"].values
n_points = len(fitx)
contSliceAxis = np.full(n_points, np.nan)
contSliceAntiAxis = np.full(n_points, np.nan)
gauss_amplitude = np.full(n_points, np.nan)
gauss_stddev = np.full(n_points, np.nan)
gauss_mean = np.full(n_points, np.nan)
# Create slice array once
# OPTIMISE: 1s - 6464 hits
slices = [
cut_image_slice(
log=self.log,
frame=self.traceFrame,
width=self.sliceWidth,
length=sliceLength,
x=x,
y=y,
sliceAxis=self.detectorParams["dispersion-axis"],
median=True,
debug=self.debug,
)
for x, y in zip(fitx, fity)
]
# Initialize fitter once
fit_g = fitting.LevMarLSQFitter()
# Process each slice
for i, s in enumerate(slices):
slice_data, slice_length_offset, slice_width_centre = s[0], s[1], s[2]
if slice_data is None:
continue
# Replace values < -500 with NaN
slice_data = np.where(slice_data < -50, np.nan, slice_data)
# Calculate baseline and std using vectorized operations
baseline = np.nanpercentile(slice_data, 20)
std_r = mad_std(slice_data, ignore_nan=True)
if baseline < 0:
baseline = 0
if baseline is None:
continue
# Find peaks using vectorized operations
peaks, _ = find_peaks(slice_data, height=baseline + self.peakSigmaLimit * std_r, width=1)
if len(peaks) == 0:
continue
if len(peaks) > 1:
# Vectorized closest peak finding
peak_diffs = np.abs(peaks - sliceLength / 2.0)
peaks = [peaks[np.argmin(peak_diffs)]]
# Initialize and fit Gaussian
# OPTIMISE: 1s
g_init = models.Gaussian1D(amplitude=slice_data[peaks[0]], mean=peaks[0], stddev=medianStddev)
try:
mask = np.isfinite(slice_data)
x_range = np.arange(len(slice_data))
# OPTIMISE: 7s - 3185 hits
g = fit_g(g_init, x_range[mask], slice_data[mask])
if 0 <= g.mean.value <= sliceLength and g.stddev.value > 0.5:
contSliceAxis[i] = g.mean.value + max(0, slice_length_offset)
contSliceAntiAxis[i] = slice_width_centre
gauss_amplitude[i] = g.amplitude.value
gauss_stddev[i] = g.stddev.value
gauss_mean[i] = g.mean.value
except:
continue
# Assign results back to DataFrame using vectorized operations
orderPixelTable[f"cont_{self.sliceAxis}"] = contSliceAxis
orderPixelTable[f"cont_{self.sliceAntiAxis}"] = contSliceAntiAxis
orderPixelTable["gauss_amplitude"] = gauss_amplitude
orderPixelTable["gauss_stddev"] = gauss_stddev
orderPixelTable["gauss_mean"] = gauss_mean
return orderPixelTable
[docs]
def plot_results(self, orderPixelTable, orderPolyTable, clippedData):
"""*generate a plot of the polynomial fits and residuals*
**Key Arguments:**
- ``orderPixelTable`` -- the pixel table with residuals of fits
- ``orderPolyTable`` -- data-frame of order-location polynomial coeff
- ``clippedData`` -- the sigma-clipped data
**Return:**
- ``filePath`` -- path to the plot pdf
- ``orderMetaTable`` -- dataframe of useful order fit metadata
"""
self.log.debug("starting the ``plot_results`` method")
import numpy as np
import pandas as pd
from soxspipe.commonutils.toolkit import qc_settings_plot_tables
import matplotlib.pyplot as plt
arm = self.arm
# ROTATE THE IMAGE FOR BETTER LAYOUT
rotateImage = self.detectorParams["rotate-qc-plot"]
flipImage = self.detectorParams["flip-qc-plot"]
# ROTATE THE IMAGE FOR BETTER LAYOUT
rotatedImg = self.traceFrame.data
if rotateImage:
rotatedImg = np.rot90(rotatedImg, rotateImage / 90)
if flipImage:
rotatedImg = np.flipud(rotatedImg)
if not rotateImage:
aLen = rotatedImg.shape[0]
orderPixelTable[f"cont_{self.axisA}"] = aLen - orderPixelTable[f"cont_{self.axisA}"]
clippedData[f"cont_{self.axisA}"] = aLen - clippedData[f"cont_{self.axisA}"]
clippedData[f"fit_{self.axisA}"] = aLen - clippedData[f"fit_{self.axisA}"]
if rotatedImg.shape[0] / rotatedImg.shape[1] > 0.8:
fig = plt.figure(figsize=(6, 13.5), constrained_layout=True)
# CREATE THE GID OF AXES
gs = fig.add_gridspec(7, 4)
toprow = fig.add_subplot(gs[0:2, :])
midrow = fig.add_subplot(gs[2:4, :])
bottomleft = fig.add_subplot(gs[4:5, 0:2])
bottomright = fig.add_subplot(gs[4:5, 2:])
fwhmaxis = fig.add_subplot(gs[5:6, :])
settingsAx = fig.add_subplot(gs[6:, 2:])
qcAx = fig.add_subplot(gs[6:, 0:2])
else:
fig = plt.figure(figsize=(6, 19), constrained_layout=True)
# CREATE THE GID OF AXES
gs = fig.add_gridspec(10, 4)
toprow = fig.add_subplot(gs[0:2, :])
midrow = fig.add_subplot(gs[2:4, :])
bottomleft = fig.add_subplot(gs[4:6, 0:2])
bottomright = fig.add_subplot(gs[4:6, 2:])
fwhmaxis = fig.add_subplot(gs[6:7, :])
settingsAx = fig.add_subplot(gs[7:, 2:])
qcAx = fig.add_subplot(gs[7:, 0:2])
toprow.imshow(rotatedImg, vmin=10, vmax=50, cmap="gray", alpha=0.5)
midrow.imshow(rotatedImg, vmin=10, vmax=50, cmap="gray", alpha=0.5)
toprow.set_title("1D gaussian peak positions (post-clipping)", fontsize=10)
toprow.scatter(
orderPixelTable[f"cont_{self.axisB}"],
orderPixelTable[f"cont_{self.axisA}"],
marker="o",
c="green",
s=0.3,
alpha=0.6,
label="cross-dispersion 1D gaussian peak-position",
)
# PLOT WHERE A CONTINUUM WAS NOT FOUND - WITH SLICE LENGTH SHOWN
mask = clippedData["cont_x"].isna() | clippedData["cont_y"].isna()
for axB, axA in zip(clippedData.loc[mask, f"fit_{self.axisB}"], clippedData.loc[mask, f"fit_{self.axisA}"]):
toprow.plot(
[axB, axB],
[axA - int(self.sliceLength / 2), axA + int(self.sliceLength / 2)],
color="black",
linewidth=0.5,
alpha=0.3,
label="continuum not found" if axB == clippedData.loc[mask, f"fit_{self.axisB}"].iloc[0] else None,
)
toprow.scatter(
clippedData[f"cont_{self.axisB}"],
clippedData[f"cont_{self.axisA}"],
marker="x",
c="red",
s=5,
alpha=0.6,
linewidths=0.5,
label="peaks clipped during continuum fitting",
)
# Put a legend below current axis
if arm == "VIS":
toprow.legend(loc="upper right", bbox_to_anchor=(1.0, -0.2), fontsize=4)
else:
toprow.legend(loc="upper right", bbox_to_anchor=(1.0, -0.1), fontsize=4)
# toprow.set_yticklabels([])
# toprow.set_xticklabels([])
toprow.set_ylabel(f"{self.axisA}-axis", fontsize=12)
toprow.set_xlabel(f"{self.axisB}-axis", fontsize=12)
toprow.tick_params(axis="both", which="major", labelsize=9)
toprow.set_xlim([0, rotatedImg.shape[1]])
if self.detectorParams["dispersion-axis"] == "x":
toprow.invert_yaxis()
midrow.invert_yaxis()
toprow.set_ylim([0, rotatedImg.shape[0]])
midrow.set_ylim([0, rotatedImg.shape[0]])
if "order" in self.recipeName.lower():
midrow.set_title("order-location fit solutions", fontsize=10)
else:
midrow.set_title("global polynomal fit of order-centres", fontsize=10)
if self.axisB == "y":
axisALength = self.traceFrame.data.shape[1]
axisBLength = self.traceFrame.data.shape[0]
elif self.axisB == "x":
axisALength = self.traceFrame.data.shape[0]
axisBLength = self.traceFrame.data.shape[1]
axisBlinelist = np.arange(0, axisBLength, 3)
poly = chebyshev_order_xy_polynomials(
log=self.log,
axisBCol=self.axisB,
orderCol="order",
orderDeg=self.orderDeg,
axisBDeg=self.axisBDeg,
).poly
for index, row in orderPolyTable.iterrows():
cent_coeff = [float(v) for k, v in row.items() if "cent_" in k]
std_coeff = [float(v) for k, v in row.items() if "std_" in k]
uniqueOrders = orderPixelTable["order"].unique()
# CREATE DATA FRAME FROM A DICTIONARY OF LISTS
myDict = {f"{self.axisB}": axisBlinelist}
df = pd.DataFrame(myDict)
ymin = []
ymax = []
xmin = []
xmax = []
foundOrders = []
colors = []
labelAdded = None
for o in uniqueOrders:
df["order"] = o
xfit = poly(df, *cent_coeff)
stdfit = poly(df, *std_coeff)
try:
xfit, yfit, stdfit, lower, upper = zip(
*[
(x, y, std, x - 3 * std, x + 3 * std)
for x, y, std in zip(xfit, axisBlinelist, stdfit)
if x > 0 and x < (axisALength) - 10
]
)
except:
continue
if flipImage and not rotateImage:
xfit = aLen - np.array(xfit)
lower = aLen - np.array(lower)
upper = aLen - np.array(upper)
foundOrders.append(o)
# lower = xfit - 3 * stdfit
# upper = xfit + 3 * stdfit
if labelAdded == None:
label1 = "$3\\sigma$ deviation"
label2 = "polynomial fit"
labelAdded = True
else:
label1 = None
label2 = None
c = midrow.plot(yfit, xfit, linewidth=0.7, label=label2)
midrow.fill_between(yfit, lower, upper, color=c[0].get_color(), alpha=0.3, label=label1)
colors.append(c[0].get_color())
ymin.append(min(yfit))
ymax.append(max(yfit))
xmin.append(axisALength - max(xfit))
xmax.append(axisALength - min(xfit))
try:
midrow.text(
yfit[10],
xfit[10] + 10,
int(o),
fontsize=6,
c=c[0].get_color(),
verticalalignment="bottom",
)
except:
pass
# CREATE DATA FRAME FROM A DICTIONARY OF LISTS
orderMetaTable = {
"order": foundOrders,
f"{self.axisB}min": ymin,
f"{self.axisB}max": ymax,
f"{self.axisA}min": xmin,
f"{self.axisA}max": xmax,
}
orderMetaTable = pd.DataFrame(orderMetaTable)
# xfit = np.ones(len(xfit)) * \
# self.pinholeFrame.data.shape[1] - xfit
# midrow.scatter(yfit, xfit, marker='x', c='blue', s=4)
# midrow.set_yticklabels([])
# midrow.set_xticklabels([])
midrow.set_ylabel(f"{self.axisA}-axis", fontsize=12)
midrow.set_xlabel(f"{self.axisB}-axis", fontsize=12)
midrow.tick_params(axis="both", which="major", labelsize=9)
if arm == "VIS":
midrow.legend(loc="upper right", bbox_to_anchor=(1.0, -0.2), fontsize=4)
else:
midrow.legend(loc="upper right", bbox_to_anchor=(1.0, -0.1), fontsize=4)
# PLOT THE FINAL RESULTS:
plt.subplots_adjust(top=0.92)
for o, c in zip(uniqueOrders, colors):
mask = orderPixelTable["order"] == o
bottomleft.scatter(
orderPixelTable.loc[mask][f"cont_{self.axisA}"].values,
orderPixelTable.loc[mask][f"cont_{self.axisA}_fit_res"].values,
alpha=0.2,
s=1,
c=c,
)
if len(orderPixelTable.loc[mask].index) > 10:
labelIndex = 10
else:
labelIndex = 1
try:
bottomleft.text(
orderPixelTable.loc[mask][f"cont_{self.axisA}"].values[labelIndex],
orderPixelTable.loc[mask][f"cont_{self.axisA}_fit_res"].values[labelIndex],
int(o),
fontsize=8,
c=c,
verticalalignment="bottom",
)
except:
pass
bottomleft.set_xlabel(f"{self.axisA} pixel position", fontsize=10)
bottomleft.set_ylabel(f"{self.axisA} residual", fontsize=10)
bottomleft.tick_params(axis="both", which="major", labelsize=9)
# PLOT THE FINAL RESULTS:
plt.subplots_adjust(top=0.92)
for o, c in zip(uniqueOrders, colors):
mask = orderPixelTable["order"] == o
bottomright.scatter(
orderPixelTable.loc[mask][f"cont_{self.axisB}"].values,
orderPixelTable.loc[mask][f"cont_{self.axisA}_fit_res"].values,
alpha=0.2,
s=1,
c=c,
)
if len(orderPixelTable.loc[mask].index) > 10:
labelIndex = 10
else:
labelIndex = 1
try:
bottomright.text(
orderPixelTable.loc[mask][f"cont_{self.axisB}"].values[labelIndex],
orderPixelTable.loc[mask][f"cont_{self.axisA}_fit_res"].values[labelIndex],
int(o),
fontsize=8,
c=c,
verticalalignment="bottom",
)
except:
pass
bottomright.set_xlabel(f"{self.axisB} pixel position", fontsize=10)
bottomright.tick_params(axis="both", which="major", labelsize=9)
# bottomright.set_ylabel('x residual')
bottomright.set_yticklabels([])
stdToFwhm = 2 * (2 * math.log(2)) ** 0.5
for o, c in zip(uniqueOrders, colors):
mask = orderPixelTable["order"] == o
fwhmaxis.scatter(
orderPixelTable.loc[mask]["wavelength"].values,
orderPixelTable.loc[mask]["gauss_stddev_fit"].values * stdToFwhm,
alpha=0.2,
s=1,
c=c,
)
if len(orderPixelTable.loc[mask].index) > 10:
labelIndex = 10
else:
labelIndex = 1
try:
fwhmaxis.text(
orderPixelTable.loc[mask]["wavelength"].values[labelIndex],
orderPixelTable.loc[mask]["gauss_stddev_fit"].values[labelIndex] * stdToFwhm,
int(o),
fontsize=8,
c=c,
verticalalignment="bottom",
)
except:
pass
fwhmaxis.set_xlabel("wavelength (nm)", fontsize=10)
fwhmaxis.set_ylabel("Cross-dispersion\nFWHM (pixels)", fontsize=10)
fwhmaxis.set_ylim(
orderPixelTable["gauss_stddev_fit"].min() * stdToFwhm * 0.5,
orderPixelTable["gauss_stddev_fit"].max() * stdToFwhm * 1.2,
)
# REMOVE DUPLICATE ENTRIES IN COLUMN 'qc_name' AND KEEP THE LAST ENTRY
self.qc = self.qc.drop_duplicates(subset=["qc_name"], keep="last")
qc_settings_plot_tables(
log=self.log,
qc=self.qc,
qcAx=qcAx,
settings={**self.recipeSettings, **{"exptime": self.exptime}},
settingsAx=settingsAx,
)
mean_res = np.mean(np.abs(orderPixelTable[f"cont_{self.axisA}_fit_res"].values))
std_res = np.std(orderPixelTable[f"cont_{self.axisA}_fit_res"].values)
res_min = np.min(orderPixelTable[f"cont_{self.axisA}_fit_res"].values)
res_max = np.max(orderPixelTable[f"cont_{self.axisA}_fit_res"].values)
res_range = res_max - res_min
subtitle = f"mean res: {mean_res:2.2f} pix, res stdev: {std_res:2.2f}"
lamp = ""
if self.lamp:
lamp = f" {self.lamp} lamp"
if "order" in self.recipeName.lower():
fig.suptitle(
f"traces of order-centre locations - {arm}{lamp} pinhole flat-frame\n{subtitle}",
fontsize=12,
y=0.99,
)
else:
fig.suptitle(f"{arm} object trace locations\n{subtitle}", fontsize=12)
polyOrders = [self.orderDeg, self.axisBDeg]
polyOrders[:] = [str(l) for l in polyOrders]
polyOrders = "".join(polyOrders)
# plt.show()
if not self.sofName:
filename = filenamer(log=self.log, frame=self.traceFrame, settings=self.settings)
filename = filename.split("FLAT")[0] + "ORDER_CENTRES_residuals.pdf"
elif "order" in self.recipeName.lower():
filename = self.sofName + f"_residuals_{polyOrders}.pdf"
elif "nod" in self.recipeName.lower():
filename = self.sofName + "_OBJECT_TRACE_residuals" + self.noddingSequence + f"_{polyOrders}.pdf"
else:
filename = self.sofName + f"_OBJECT_TRACE_residuals_{polyOrders}.pdf"
filePath = f"{self.qcDir}/{filename}"
plt.tight_layout()
if not self.settings["tune-pipeline"]:
plt.savefig(filePath, dpi=120, format="pdf")
if self.debug:
plt.show()
plt.close(fig)
plt.close("all")
# REDUCE MEMORY USAGE
cleanUp = [toprow, midrow, bottomleft, bottomright, fwhmaxis, settingsAx, qcAx]
for item in cleanUp:
try:
item.cla()
del item
except:
pass
if self.settings["tune-pipeline"]:
import codecs
filePath = f"residuals.txt"
exists = os.path.exists(filePath)
if not exists:
with codecs.open(filePath, encoding="utf-8", mode="w") as writeFile:
writeFile.write(f"polyOrders,mean_res,std_res,res_min,res_max,res_range \n")
with codecs.open(filePath, encoding="utf-8", mode="a") as writeFile:
writeFile.write(
f"{polyOrders},{mean_res:2.4f},{std_res:2.4f},{res_min:2.4f},{res_max:2.4f},{res_range:2.4f}\n"
)
self.log.debug("completed the ``plot_results`` method")
return filePath, orderMetaTable
[docs]
def sample_trace(self):
"""*take many cross-dispersion samples across each order to try and find an object trace*
**Return:**
- ``orderPixelTable`` -- the detector locations at which a trace was found
"""
self.log.debug("starting the ``sample_trace`` method")
import pandas as pd
import numpy as np
# CONVERT WAVELENGTH TO PIXEL POSITIONS AND RETURN ARRAY OF POSITIONS TO
# SAMPLE THE TRACES
orderPixelTable, dmBinx, dmBiny = self.create_pixel_arrays()
binx = 1
biny = 1
try:
binx = self.traceFrame.header[self.kw("WIN_BINX")] / dmBinx
biny = self.traceFrame.header[self.kw("WIN_BINY")] / dmBiny
except:
pass
# FIT_X AND FIT_Y FROM DISP-SOLUTION
orderPixelTable["fit_x"] = orderPixelTable["fit_x"] / binx
orderPixelTable["fit_y"] = orderPixelTable["fit_y"] / biny
# SLICE LENGTH TO SAMPLE TRACES IN THE CROSS-DISPERSION DIRECTION
self.sliceLength = self.recipeSettings["slice-length"]
self.peakSigmaLimit = self.recipeSettings["peak-sigma-limit"]
self.sliceWidth = self.recipeSettings["slice-width"]
if (
self.kw("DPR_TYPE").upper() in self.traceFrame.header
and "STD" in self.traceFrame.header[self.kw("DPR_TYPE")].upper()
and self.sliceWidth > 3
):
self.sliceWidth = 3
# PREP LISTS WITH NAN VALUE IN CONT_X AND CONT_Y BEFORE FITTING
orderPixelTable[f"cont_{self.axisA}"] = np.nan
orderPixelTable[f"cont_{self.axisB}"] = np.nan
# FOR EACH ORDER, FOR EACH PIXEL POSITION SAMPLE, FIT A 1D GAUSSIAN IN
# CROSS-DISPERSION DIRECTION. RETURN PEAK POSITIONS
from soxspipe.commonutils.toolkit import quicklook_image
quicklook_image(
log=self.log,
CCDObject=self.traceFrame,
show=self.debug,
ext="data",
stdWindow=3,
title=False,
surfacePlot=False,
)
if "order" in self.recipeName.lower():
self.log.print("\n# FINDING & FITTING ORDER-CENTRE CONTINUUM TRACES\n")
else:
self.log.print("\n# FINDING & FITTING OBJECT CONTINUUM TRACES\n")
def find_centre_points(orderPixelTable, medianShift=False, medianStddev=False):
"""*find the central peak of the continuum trace*"""
from astropy.stats import sigma_clip, mad_std
import numpy as np
# UNIQUE ORDERS
uniqueOrders = orderPixelTable["order"].unique()
uniqueOrders = "&".join(map(str, sorted(uniqueOrders)))
sliceLength = self.sliceLength
if np.isnan(medianShift):
sliceLength = sliceLength * 1.2
shifted = ""
elif medianShift:
shifted = f"\nShifted by {medianShift:2.1f} pixels"
orderPixelTable[f"fit_{self.axisA}"] = orderPixelTable[f"fit_{self.axisA}"] - medianShift
sliceLength = sliceLength - int(abs(medianShift))
if sliceLength > medianStddev * 5 + 5:
sliceLength = int(medianStddev * 5 + 5)
if sliceLength < 7:
sliceLength = 7
else:
shifted = ""
sliceLengthStr = f"\nSlit Length = {str(sliceLength)} pixels"
orderPixelTable = self.fit_1d_gaussian_to_slices(
orderPixelTable=orderPixelTable,
sliceLength=sliceLength,
medianStddev=medianStddev,
)
if "pre-clipped" not in orderPixelTable.columns:
orderPixelTable["pre-clipped"] = False
# FILTER DATA FRAME
# FIRST CREATE THE MASK
mask = orderPixelTable["cont_x"] < 0
orderPixelTable.loc[mask, "cont_x"] = np.nan
orderPixelTable.loc[mask, "pre-clipped"] = True
mask = orderPixelTable["cont_y"] < 0
orderPixelTable.loc[mask, "cont_y"] = np.nan
orderPixelTable.loc[mask, "pre-clipped"] = True
mask = orderPixelTable["gauss_stddev"] < 0.3
orderPixelTable.loc[mask, "pre-clipped"] = True
increaseSlitLength = False
for o in orderPixelTable["order"].unique():
mask = (orderPixelTable["order"] == o) & (orderPixelTable["pre-clipped"] == False)
if len(orderPixelTable.loc[mask].index) < 5:
increaseSlitLength = True
continue
# FIND HOW FAR AWAY FROM THE CENTRE THE CONTINUUM WAS FOUND
orderPixelTable["centre_shift"] = (
orderPixelTable[f"fit_{self.axisA}"] - orderPixelTable[f"cont_{self.axisA}"]
)
# SIGMA-CLIP THE DATA ON STDDEV
if "centre" not in self.recipeName:
mask = orderPixelTable["pre-clipped"] == False
masked_residuals = sigma_clip(
orderPixelTable.loc[mask]["gauss_stddev"],
sigma_lower=3,
sigma_upper=3,
maxiters=5,
cenfunc="mean",
stdfunc="std",
)
orderPixelTable.loc[mask, "pre-clipped"] = masked_residuals.mask
mask = orderPixelTable["pre-clipped"] == False
masked_residuals = sigma_clip(
orderPixelTable.loc[mask]["centre_shift"],
sigma_lower=3,
sigma_upper=3,
maxiters=5,
cenfunc="mean",
stdfunc="std",
)
orderPixelTable.loc[mask, "pre-clipped"] = masked_residuals.mask
else:
orderPixelTable["pre-clipped"] = False
mask = orderPixelTable["pre-clipped"] == False
filteredDf = orderPixelTable.loc[mask]
medianShift = filteredDf["centre_shift"].median()
medianStddev1 = filteredDf["gauss_stddev"].median()
from astropy.stats import sigma_clipped_stats
mean1, medianShift, std1 = sigma_clipped_stats(
filteredDf["centre_shift"].values, sigma=2.5, stdfunc="std", cenfunc="mean", maxiters=5
)
mean2, medianStddev, std2 = sigma_clipped_stats(
filteredDf["gauss_stddev"].values, sigma=2.5, stdfunc="std", cenfunc="mean", maxiters=5
)
if not std1 or ((abs(medianShift) + 2) / std1 < 6 and medianStddev < 1):
medianShift = False
medianStddev = False
elif (abs(medianShift)) < 1.0:
medianShift = False
elif medianStddev < 1:
medianStddev = False
if self.debug:
import matplotlib.pyplot as plt
plt.figure(figsize=(8, 5))
plt.scatter(
orderPixelTable["wavelength"],
orderPixelTable["centre_shift"],
marker="o",
s=30,
label=f"Median Shift={medianShift:2.1f}, Median STD={medianStddev:2.1f}",
)
# LABEL X and Y AXES
plt.xlabel("Wavelength (nm)", fontsize=12)
plt.ylabel(f"{self.axisA} centre shift (pixels)", fontsize=12)
plt.title(
f"{self.arm} continuum trace centre shifts (Orders {uniqueOrders}){sliceLengthStr}{shifted} FINDCENTRE",
fontsize=14,
)
filteredDf = orderPixelTable.loc[~mask]
plt.scatter(
filteredDf["wavelength"],
filteredDf["centre_shift"],
marker="x",
s=30,
)
plt.legend()
plt.show()
if self.debug:
import matplotlib.pyplot as plt
plt.figure(figsize=(8, 5))
plt.scatter(
orderPixelTable["wavelength"],
orderPixelTable["gauss_stddev"],
marker="o",
s=30,
label=f"Median Shift={medianShift:2.1f}, Median STD={medianStddev:2.1f}",
)
# LABEL X and Y AXES
plt.xlabel("Wavelength (nm)", fontsize=12)
plt.ylabel("Gaussian stddev (pixels)", fontsize=12)
plt.title(
f"{self.arm} continuum trace Gaussian stddevs (Orders {uniqueOrders}){sliceLengthStr}{shifted} FINDCENTRE",
fontsize=14,
)
mask = orderPixelTable["pre-clipped"] == True
filteredDf = orderPixelTable.loc[mask]
plt.scatter(
filteredDf["wavelength"],
filteredDf["gauss_stddev"],
marker="x",
s=30,
)
plt.legend()
plt.show()
# DROP THE MASKED PEAKS?
if False:
mask = orderPixelTable["pre-clipped"] == False
orderPixelTable = orderPixelTable.loc[mask]
# REMOVE COLUMN FROM DATA FRAME
# orderPixelTable.drop(columns=['mask'], inplace=True)
if increaseSlitLength:
medianShift = np.nan
return orderPixelTable, medianShift, medianStddev
allLines = len(orderPixelTable.index)
tmpOrderPixelTable = orderPixelTable.copy()
everyN = 10
sanityCheck = False
while not sanityCheck:
if self.inst.upper() == "SOXS" and self.arm.upper() == "VIS":
# TREAT ORDERS 2&3 SEPARATELY FROM 1&4 THEN RECOMBINE THE orderPixelTable AT THE END
mask_23 = tmpOrderPixelTable["order"].isin([2, 3])
mask_14 = tmpOrderPixelTable["order"].isin([1, 4])
orderPixelTable_23 = tmpOrderPixelTable.loc[mask_23].copy()
orderPixelTable_14 = tmpOrderPixelTable.loc[mask_14].copy()
# PROCESS 2 & 3
junk, medianShift_23, medianStddev_23 = find_centre_points(
orderPixelTable=orderPixelTable_23.iloc[::everyN], medianShift=False, medianStddev=False
)
if np.isnan(medianShift_23):
junk, medianShift_23, medianStddev_23 = find_centre_points(
orderPixelTable=orderPixelTable_23.iloc[::everyN],
medianShift=medianShift_23,
medianStddev=False,
)
if (not (medianShift_23) or np.isnan(medianShift_23)) and everyN > 1:
sanityCheck = False
everyN = 1
# print("Reducing everyN to 1")
continue
orderPixelTable_23, medianShift_23, medianStddev_23 = find_centre_points(
orderPixelTable=orderPixelTable_23,
medianShift=medianShift_23,
medianStddev=medianStddev_23,
)
# PROCESS 1 & 4
junk, medianShift_14, medianStddev_14 = find_centre_points(
orderPixelTable=orderPixelTable_14.iloc[::everyN], medianShift=False, medianStddev=False
)
if np.isnan(medianShift_14):
junk, medianShift_14, medianStddev_14 = find_centre_points(
orderPixelTable=orderPixelTable_14.iloc[::everyN],
medianShift=medianShift_14,
medianStddev=False,
)
if (not (medianShift_14) or np.isnan(medianShift_14)) and everyN > 1:
sanityCheck = False
everyN = 1
# print("Reducing everyN to 1")
continue
orderPixelTable_14, medianShift_14, medianStddev_14 = find_centre_points(
orderPixelTable=orderPixelTable_14,
medianShift=medianShift_14,
medianStddev=medianStddev_14,
)
# RECOMBINE
orderPixelTable = pd.concat([orderPixelTable_23, orderPixelTable_14], ignore_index=True)
else:
tmpOrderPixelTable = orderPixelTable.copy()
junk, medianShift, medianStddev = find_centre_points(
orderPixelTable=tmpOrderPixelTable.iloc[::everyN], medianShift=False, medianStddev=False
)
if np.isnan(medianShift):
junk, medianShift, tmpOrderPixelTable = find_centre_points(
orderPixelTable=tmpOrderPixelTable.iloc[::everyN], medianShift=medianShift, medianStddev=False
)
if not (medianShift) and everyN > 1:
sanityCheck = False
everyN = 1
# print("Reducing everyN to 1")
continue
if np.isnan(medianShift):
self.log.error("FLUX IN THE INPUT FLAT FRAMES IS TOO LOW TO PROCEED. PLEASE CHECK THE RAW FRAMES")
raise ValueError("FLUX IN THE INPUT FLAT FRAMES IS TOO LOW TO PROCEED. PLEASE CHECK THE RAW FRAMES")
orderPixelTable, medianShift, medianStddev = find_centre_points(
orderPixelTable=tmpOrderPixelTable,
medianShift=medianShift,
medianStddev=medianStddev,
)
sanityCheck = True
# MASK orderPixelTable WHERE cont_x or cont_y is NaN
mask = orderPixelTable["cont_x"].isna() | orderPixelTable["cont_y"].isna()
foundLines = len(orderPixelTable.loc[~mask].index)
percent = 100 * foundLines / allLines
utcnow = datetime.utcnow()
utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S")
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": "SAMPLES TOT NUM",
"qc_value": allLines,
"qc_comment": "Total number of samples along orders",
"qc_unit": None,
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": "SAMPLES DET NUM",
"qc_value": foundLines,
"qc_comment": "Number of samples where a continuum is detected",
"qc_unit": None,
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
self.psamp = percent / 100
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": "SAMPLES DET FRAC",
"qc_value": f"{self.psamp:0.3f}",
"qc_comment": "Proportion of samples where a continuum is detected",
"qc_unit": None,
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
self.log.print(f"\tContinuum found in {foundLines} out of {allLines} order slices ({percent:2.0f}%)")
self.log.debug("completed the ``sample_trace`` method")
return orderPixelTable
# use the tab-trigger below for new method
# xt-class-method