#!/usr/bin/env python
# encoding: utf-8
"""
*small reusable functions used throughout soxspipe*
Author
: David Young
Date Created
: September 18, 2020
"""
from os.path import expanduser
from soxspipe.commonutils import detector_lookup
from datetime import datetime, UTC
from soxspipe.commonutils import keyword_lookup
from soxspipe.commonutils.polynomials import chebyshev_xy_polynomial, chebyshev_order_xy_polynomials
from fundamentals import tools
from builtins import object
import sys
from soxspipe.commonutils.dispersion_map_to_pixel_arrays import dispersion_map_to_pixel_arrays
import os
from line_profiler import profile
os.environ["TERM"] = "vt100"
[docs]
def cut_image_slice(log, frame, width, length, x, y, sliceAxis="x", median=False, debug=False):
"""*cut and return an N-pixel wide and M-pixels long slice, centred on a given coordinate from an image frame*
**Key Arguments:**
- ``log`` -- logger
- ``frame`` -- the data array to cut the slice from (masked array)
- ``width`` -- width of the slice (odd number)
- ``length`` -- length of the slice
- ``x`` -- x-coordinate
- ``y`` -- y-coordinate
- ``sliceAxis`` -- the axis along which slice is to be taken. Default *x*
- ``median`` -- collapse the slice to a median value across its width
- ``debug`` -- generate a plot of slice. Useful for debugging.
**Usage:**
```python
from soxspipe.commonutils.toolkit import cut_image_slice
slice = cut_image_slice(log=self.log, frame=self.pinholeFlat.data,
width=1, length=sliceLength, x=x_fit, y=y_fit, plot=False)
if slice is None:
return None
```
"""
log.debug("starting the ``cut_image_slice`` function")
import numpy.ma as ma
import numpy as np
import random
halfSlice = length / 2
# NEED AN EVEN PIXEL SIZE
if (width % 2) != 0:
halfwidth = (width - 1) / 2
else:
halfwidth = width / 2
if sliceAxis == "x":
axisA = x
axisB = y
axisALen = frame.shape[1]
axisBLen = frame.shape[0]
elif sliceAxis == "y":
axisB = x
axisA = y
axisALen = frame.shape[0]
axisBLen = frame.shape[1]
else:
raise ValueError("sliceAxis needs to be either 'x' or 'y'")
# CHECK WE ARE NOT GOING BEYOND BOUNDS OF FRAME
if (axisA > axisALen - halfSlice) or (axisB > axisBLen - halfwidth) or (axisA < halfSlice) or (axisB < halfwidth):
return None, None, None
slice_length_offset = int(axisA - halfSlice)
if sliceAxis == "x":
sliceFull = frame[
int(axisB - halfwidth) : int(axisB + halfwidth + 1), slice_length_offset : int(axisA + halfSlice)
]
else:
sliceFull = frame[
slice_length_offset : int(axisA + halfSlice), int(axisB - halfwidth) : int(axisB + halfwidth + 1)
]
slice_width_centre = (int(axisB + halfwidth + 1) + int(axisB - halfwidth)) / 2
# # FORCE CONVERSION OF CCDData OBJECT TO NUMPY ARRAY
# maskedDataArray = np.ma.array(sliceFull.data, mask=sliceFull.mask)
# try:
# sliceFull.data=sliceFull.data - np.percentile(maskedDataArray, 30)
# except:
# pass
if median:
if sliceAxis == "y":
slice = ma.median(sliceFull, axis=1)
else:
slice = ma.median(sliceFull, axis=0)
if False and debug and random.randint(1, 101) < 5:
import matplotlib.pyplot as plt
# CHECK THE SLICE POINTS IF NEEDED
if sliceAxis == "y":
sliceImg = np.rot90(sliceFull, 1)
else:
sliceImg = sliceFull
plt.imshow(sliceImg)
plt.show()
xx = np.arange(0, len(slice))
plt.figure(figsize=(8, 5))
if sliceAxis == "y":
plt.plot(xx, slice, "ko", label=f"x={axisB}, y={axisA}, sliceAxis={sliceAxis}")
if sliceAxis == "x":
plt.plot(xx, slice, "ko", label=f"x={axisA}, y={axisB}, sliceAxis={sliceAxis}")
plt.xlabel("Position")
plt.ylabel("Flux")
plt.legend()
plt.show()
log.debug("completed the ``cut_image_slice`` function")
return slice, slice_length_offset, slice_width_centre
[docs]
def quicklook_image(
log,
CCDObject,
show=True,
ext="data",
stdWindow=3,
title=False,
surfacePlot=False,
dispMap=False,
dispMapImage=False,
inst=False,
settings=False,
skylines=False,
saveToPath=False,
):
"""*generate a quicklook image of a CCDObject - useful for development/debugging*
**Key Arguments:**
- ``log`` -- logger
- ``CCDObject`` -- the CCDObject to plot
- ``show`` -- show the image. Set to False to skip
- ``ext`` -- the name of the the extension to show. Can be "data", "mask" or "err". Default "data".
- ``title`` -- give a title for the plot
- ``surfacePlot`` -- plot as a 3D surface plot
- ``dispMap`` -- path to dispersion map. Default *False*
- ``dispMapImage`` -- the 2D dispersion map image
- ``inst`` -- provide instrument name if no header exists
- ``skylines`` -- mark skylines on image
```python
from soxspipe.commonutils.toolkit import quicklook_image
quicklook_image(
log=self.log, CCDObject=myframe, show=True)
```
"""
log.debug("starting the ``quicklook_image`` function")
if not show and not saveToPath:
return
import pandas as pd
import matplotlib as mpl
import numpy as np
from copy import copy
from soxspipe.commonutils.toolkit import twoD_disp_map_image_to_dataframe
from soxspipe.commonutils import keyword_lookup
from soxspipe.commonutils import detector_lookup
originalRC = dict(mpl.rcParams)
import matplotlib.pyplot as plt
if settings:
# KEYWORD LOOKUP OBJECT - LOOKUP KEYWORD FROM DICTIONARY IN RESOURCES
# FOLDER
kw = keyword_lookup(log=log, settings=settings).get
arm = CCDObject.header[kw("SEQ_ARM")]
dateObs = CCDObject.header[kw("DATE_OBS")]
# DETECTOR PARAMETERS LOOKUP OBJECT
detectorParams = detector_lookup(log=log, settings=settings).get(arm)
# USE THIS ELSEWHERE IN THE OBJECT METHODS
dp = detectorParams
science_pixels = dp["science-pixels"]
if ext == "data":
frame = CCDObject.data
elif ext == "mask":
frame = CCDObject.mask
elif ext == "uncertainty":
frame = CCDObject.uncertainty.array
else:
# ASSUME ONLY NDARRAY
frame = CCDObject
if inst is False:
try:
inst = CCDObject.header["INSTRUME"]
except:
inst = "XSHOOTER"
if skylines:
calibrationRootPath = get_calibrations_path(log=log, settings=settings)
skylines = calibrationRootPath + "/" + dp["skylines"]
# SPEC FORMAT TO PANDAS DATAFRAME
from astropy.table import Table
dat = Table.read(skylines, format="fits")
skylinesDF = dat.to_pandas()
else:
skylinesDF = False
# COMBINE MASK WITH THE BAD PIXEL MASK
if not isinstance(dispMapImage, bool):
gridLinePixelTable, interOrderMask = create_dispersion_solution_grid_lines_for_plot(
log=log, dispMap=dispMap, dispMapImage=dispMapImage, associatedFrame=CCDObject, kw=kw, skylines=skylinesDF
)
try:
mask = (frame.mask == 1) | (interOrderMask == 1)
except:
mask = interOrderMask == 1
try:
frame.mask = mask
except:
pass
if inst == "SOXS":
rotatedImg = np.flipud(frame)
elif inst == "XSHOOTER":
rotatedImg = np.rot90(frame, 1)
else:
rotatedImg = frame
rotatedImg = np.flipud(rotatedImg)
from astropy.stats import sigma_clipped_stats
mean, median, std = sigma_clipped_stats(frame, sigma=50.0, stdfunc="mad_std", cenfunc="median", maxiters=3)
# std = np.nanstd(frame)
# mean = np.nanmean(frame)
# median = np.nanmean(frame)
palette = copy(plt.cm.viridis)
palette.set_bad("#dc322f", 1.0)
vmax = median + stdWindow * 0.5 * std
vmin = median - stdWindow * 0.5 * std
if surfacePlot:
from matplotlib import rc
axisColour = "#002b36"
rc("axes", edgecolor=axisColour, labelcolor=axisColour, linewidth=0.6)
rc("xtick", color=axisColour)
rc("ytick", color=axisColour)
rc("grid", color=axisColour)
rc("text", color=axisColour)
fig = plt.figure(figsize=(20, 8))
ax = fig.add_subplot(121, projection="3d")
if inst == "XSHOOTER":
plt.gca().invert_yaxis()
ax.set_box_aspect(aspect=(2, 1, 1))
# Remove gray panes and axis grid
ax.xaxis.pane.fill = False
ax.zaxis.pane.set_facecolor("#dc322f")
ax.zaxis.pane.set_alpha(1.0)
ax.yaxis.pane.fill = False
ax.grid(False)
# Remove z-axis
# ax.w_zaxis.line.set_lw(0.)
# ax.set_zticks([])
X, Y = np.meshgrid(
np.linspace(0, rotatedImg.shape[1], rotatedImg.shape[1]),
np.linspace(0, rotatedImg.shape[0], rotatedImg.shape[0]),
)
surface = ax.plot_surface(X=X, Y=Y, Z=rotatedImg, cmap="viridis", antialiased=True, vmin=vmin, vmax=vmax)
if inst == "SOXS":
ax.azim = 70
else:
ax.azim = -120
ax.elev = 30
ax.set_xlim(0, rotatedImg.shape[1])
ax.set_ylim(0, rotatedImg.shape[0])
ax.set_zlim(vmin, min(np.nanmax(frame), vmax * 1.2))
if inst == "SOXS":
ax.invert_yaxis()
backgroundColour = "white"
fig.set_facecolor(backgroundColour)
ax.set_facecolor(backgroundColour)
ax.xaxis.pane.set_edgecolor(backgroundColour)
ax.yaxis.pane.set_edgecolor(backgroundColour)
ax.zaxis.pane.set_edgecolor(backgroundColour)
if inst == "SOXS":
plt.xlabel("x-axis", fontsize=16)
plt.ylabel("y-axis", fontsize=16)
else:
plt.xlabel("y-axis", fontsize=16)
plt.ylabel("x-axis", fontsize=16)
ax2 = fig.add_subplot(122)
else:
if rotatedImg.shape[0] - rotatedImg.shape[1] > 1000:
fig = plt.figure(figsize=(5, 12))
else:
fig = plt.figure(figsize=(12, 5))
# palette.set_over('r', 1.0)
# palette.set_under('g', 1.0)
ax2 = fig.add_subplot(111)
if not isinstance(dispMapImage, bool):
for l in range(int(gridLinePixelTable["line"].max())):
mask = gridLinePixelTable["line"] == l
if inst == "SOXS":
ax2.plot(
gridLinePixelTable.loc[mask]["fit_x"],
gridLinePixelTable.loc[mask]["fit_y"],
"w-",
linewidth=0.5,
alpha=0.8,
color="black",
)
else:
ax2.plot(
gridLinePixelTable.loc[mask]["fit_y"],
gridLinePixelTable.loc[mask]["fit_x"],
"w-",
linewidth=0.5,
alpha=0.8,
color="black",
)
if rotatedImg.shape[0] - rotatedImg.shape[1] > 1000:
ax2.set_box_aspect(2.0)
else:
ax2.set_box_aspect(0.5)
detectorPlot = plt.imshow(rotatedImg, vmin=vmin, vmax=vmax, cmap=palette, alpha=1, aspect="auto")
if surfacePlot:
shrink = 0.5
else:
shrink = 1.0
if mean > 10:
fmt = "%1.0f"
fig.colorbar(detectorPlot, shrink=shrink, format=fmt)
else:
fig.colorbar(detectorPlot, shrink=shrink)
# plt.colorbar()
if title:
fig.suptitle(title, fontsize=20)
if inst == "XSHOOTER":
ax2.invert_yaxis()
# cbar.ticklabel_format(useOffset=False)
if inst == "SOXS":
plt.xlabel("x-axis", fontsize=16)
plt.ylabel("y-axis", fontsize=16)
else:
plt.xlabel("y-axis", fontsize=16)
plt.ylabel("x-axis", fontsize=16)
if show:
plt.pause(0.1)
# plt.show()
if saveToPath:
plt.savefig(saveToPath, dpi=120, format="pdf", bbox_inches="tight")
plt.clf() # clear figure
mpl.rcParams.update(originalRC)
plt.close("all")
log.debug("completed the ``quicklook_image`` function")
return None
[docs]
def unpack_order_table(
log,
orderTablePath,
extend=0.0,
pixelDelta=1,
binx=1,
biny=1,
prebinned=False,
order=False,
limitToDetectorFormat=False,
):
"""*Unpack an order location table and return an `orderPolyTable` dataframe containing the polynomial coefficients for the order centres and edges, an `orderPixelTable` dataframe containing the pixel-coordinates for each order centre and edges, and finally, an `orderMetaTable` dataframe giving metadata about the frame binning and format.*
**Key Arguments:**
- ``orderTablePath`` -- path to the order table
- ``extend`` -- fractional increase to the order area in the y-axis (needed for masking)
- ``pixelDelta`` -- space between returned data points. Default *1* (sampled at every pixel)
- ``binx`` -- binning in the x-axis (from FITS header). Default *1*
- ``biny`` -- binning in the y-axis (from FITS header). Default *1*
- ``prebinned`` -- was the order-table measured on a pre-binned frame (typically only for mflats). Default *False*
- ``order`` -- unpack only a single order
- ``limitToDetectorFormat`` -- limit the pixels return to those limited by the detector format static calibration table
**Usage:**
```python
# UNPACK THE ORDER TABLE
from soxspipe.commonutils.toolkit import unpack_order_table
orderPolyTable, orderPixelTable = unpack_order_table(
log=self.log, orderTablePath=orderTablePath, extend=0.)
```
"""
log.debug("starting the ``functionName`` function")
from astropy.table import Table
import pandas as pd
import numpy as np
import math
# PIXEL DELTA NEEDS TO BE ODD .. ELSE MASKING ON BINNED DATA GETS MESSED UP
if pixelDelta % 2 == 0:
pixelDelta += 1 # Return the nearest odd number above if it's even
# MAKE RELATIVE HOME PATH ABSOLUTE
home = expanduser("~")
orderTablePath = orderTablePath.replace("~", home)
dat = Table.read(orderTablePath, format="fits", hdu=1)
orderPolyTable = dat.to_pandas()
dat = Table.read(orderTablePath, format="fits", hdu=2)
orderMetaTable = dat.to_pandas()
if order:
mask = orderMetaTable["order"] == order
orderMetaTable = orderMetaTable.loc[mask]
if "degy_cent" in orderPolyTable.columns:
axisA = "x"
axisB = "y"
axisAbin = binx
axisBbin = biny
else:
axisA = "y"
axisB = "x"
axisAbin = biny
axisBbin = binx
# ADD AXIS B COORD LIST
if prebinned:
ratio = axisBbin
else:
ratio = 1
blower = orderMetaTable[f"{axisB}min"].values * ratio
bupper = orderMetaTable[f"{axisB}max"].values * ratio
brange = orderMetaTable[f"{axisB}max"].values * ratio - orderMetaTable[f"{axisB}min"].values * ratio
axisBcoords = [
np.arange(
0 if (math.floor(l) - int(r * extend)) < 0 else (math.floor(l) - int(r * extend)),
4200 if (math.ceil(u) + int(r * extend)) > 4200 else (math.ceil(u) + int(r * extend)),
pixelDelta,
)
for l, u, r in zip(blower, bupper, brange)
]
orders = [np.full_like(a, o) for a, o in zip(axisBcoords, orderMetaTable["order"].values)]
# CREATE DATA FRAME FROM A DICTIONARY OF LISTS
myDict = {f"{axisB}coord": np.concatenate(axisBcoords), "order": np.concatenate(orders)}
orderPixelTable = pd.DataFrame(myDict)
cent_coeff = [float(v) for k, v in orderPolyTable.iloc[0].items() if "cent_" in k]
poly = chebyshev_order_xy_polynomials(
log=log,
axisBCol=f"{axisB}coord",
orderCol="order",
orderDeg=int(orderPolyTable.iloc[0]["degorder_cent"]),
axisBDeg=int(orderPolyTable.iloc[0][f"deg{axisB}_cent"]),
).poly
orderPixelTable[f"{axisA}coord_centre"] = poly(orderPixelTable, *cent_coeff)
std_coeff = [float(v) for k, v in orderPolyTable.iloc[0].items() if "std_" in k]
if len(std_coeff):
poly = chebyshev_order_xy_polynomials(
log=log,
axisBDeg=int(orderPolyTable.iloc[0][f"deg{axisB}_cent"]),
orderDeg=int(orderPolyTable.iloc[0]["degorder_cent"]),
orderCol="order",
axisBCol=f"{axisB}coord",
).poly
orderPixelTable["std"] = poly(orderPixelTable, *std_coeff)
if f"deg{axisB}_edgeup" in orderPolyTable.columns:
upper_coeff = [float(v) for k, v in orderPolyTable.iloc[0].items() if "edgeup_" in k]
poly = chebyshev_order_xy_polynomials(
log=log,
axisBDeg=int(orderPolyTable.iloc[0][f"deg{axisB}_edgeup"]),
orderDeg=int(orderPolyTable.iloc[0]["degorder_edgeup"]),
orderCol="order",
axisBCol=f"{axisB}coord",
).poly
orderPixelTable[f"{axisA}coord_edgeup"] = poly(orderPixelTable, *upper_coeff)
if f"deg{axisB}_edgelow" in orderPolyTable.columns:
lower_coeff = [float(v) for k, v in orderPolyTable.iloc[0].items() if "edgelow_" in k]
poly = chebyshev_order_xy_polynomials(
log=log,
axisBDeg=int(orderPolyTable.iloc[0][f"deg{axisB}_edgelow"]),
orderDeg=int(orderPolyTable.iloc[0]["degorder_edgelow"]),
orderCol="order",
axisBCol=f"{axisB}coord",
).poly
orderPixelTable[f"{axisA}coord_edgelow"] = poly(orderPixelTable, *lower_coeff)
if axisAbin != 1:
for c in ["coord_centre", "coord_edgeup", "coord_edgelow"]:
if f"{axisA}{c}" in orderPixelTable.columns:
orderPixelTable[f"{axisA}{c}"] /= axisAbin
if axisBbin != 1:
orderMetaTable[f"{axisB}min"] /= axisBbin
orderMetaTable[f"{axisB}max"] /= axisBbin
orderPixelTable[f"{axisB}coord"] /= axisBbin
orderPixelTable["std"] /= axisBbin
mask = orderPixelTable[f"{axisB}coord"].mod(1) > 0
orderPixelTable = orderPixelTable.loc[~mask]
orderPixelTable[f"{axisB}coord"] = orderPixelTable[f"{axisB}coord"].round().astype("int")
log.debug("completed the ``functionName`` function")
return orderPolyTable, orderPixelTable, orderMetaTable
[docs]
def generic_quality_checks(log, frame, settings, recipeName, qcTable):
"""*measure very basic quality checks on a frame and return the QC table with results appended*
**Key Arguments:**
- `log` -- logger
- `frame` -- CCDData object
- `settings` -- soxspipe settings
- `recipeName` -- the name of the recipe
- `qcTable` -- the QC pandas data-frame to save the QC measurements
**Usage:**
```python
from soxspipe.commonutils.toolkit import generic_quality_checks
qcTable = generic_quality_checks(log=log, frame=myFrame, settings=settings, recipeName="my recipe", qcTable=qcTable)
```
"""
log.debug("starting the ``functionName`` function")
import numpy as np
import pandas as pd
# KEYWORD LOOKUP OBJECT - LOOKUP KEYWORD FROM DICTIONARY IN RESOURCES
# FOLDER
kw = keyword_lookup(log=log, settings=settings).get
kw = kw
arm = frame.header[kw("SEQ_ARM")]
dateObs = frame.header[kw("DATE_OBS")]
# nanCount = np.count_nonzero(np.isnan(frame.data))
utcnow = datetime.now(UTC)
utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S")
# COUNT BAD-PIXELS
badCount = frame.mask.sum()
totalPixels = np.size(frame.mask)
percent = float(badCount) / float(totalPixels)
percent = float("{:.6f}".format(percent))
if "dark" in recipeName.lower():
qcComment = "Number of hot pixels"
qcName = "HOTPIX NUM"
elif "flat" in recipeName.lower():
qcComment = "Number of cold pixels"
qcName = "COLDPIX NUM"
else:
qcComment = "Number of bad pixels"
qcName = "BADPIX NUM"
qcTable = pd.concat(
[
qcTable,
pd.Series(
{
"soxspipe_recipe": recipeName,
"qc_name": qcName,
"qc_value": int(badCount),
"qc_comment": qcComment,
"qc_unit": "",
"obs_date_utc": dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
if "dark" in recipeName.lower():
qcComment = "Fraction of hot pixels"
qcName = "HOTPIX FRAC"
elif "flat" in recipeName.lower():
qcComment = "Fraction of cold pixels"
qcName = "COLDPIX FRAC"
else:
qcComment = "Fraction of bad pixels"
qcName = "BADPIX FRAC"
qcTable = pd.concat(
[
qcTable,
pd.Series(
{
"soxspipe_recipe": recipeName,
"qc_name": qcName,
"qc_value": percent,
"qc_comment": qcComment,
"qc_unit": "",
"obs_date_utc": dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
log.debug("completed the ``functionName`` function")
return qcTable
[docs]
def spectroscopic_image_quality_checks(log, frame, orderTablePath, settings, recipeName, qcTable):
"""*measure and record spectroscopic image quailty checks*
**Key Arguments:**
- `log` -- logger
- `frame` -- CCDData object
- ``orderTablePath`` -- path to the order table
- `settings` -- soxspipe settings
- `recipeName` -- the name of the recipe
- `qcTable` -- the QC pandas data-frame to save the QC measurements
**Usage:**
```python
from soxspipe.commonutils.toolkit import spectroscopic_image_quality_checks
qcTable = spectroscopic_image_quality_checks(
log=log, frame=myFrame, settings=settings, recipeName="this recipe", qcTable=qcTable, orderTablePath=orderTablePath)
```
"""
log.debug("starting the ``functionName`` function")
import numpy.ma as ma
import numpy as np
import pandas as pd
from soxspipe.commonutils import detector_lookup
from astropy.io import fits
# KEYWORD LOOKUP OBJECT - LOOKUP KEYWORD FROM DICTIONARY IN RESOURCES
# FOLDER
kw = keyword_lookup(log=log, settings=settings).get
arm = frame.header[kw("SEQ_ARM")]
dateObs = frame.header[kw("DATE_OBS")]
try:
binx = frame.header[kw("WIN_BINX")]
biny = frame.header[kw("WIN_BINY")]
except:
if arm.lower() == "nir":
binx = 1
biny = 1
inst = frame.header[kw("INSTRUME")]
# DETECTOR PARAMETERS LOOKUP OBJECT
detectorParams = detector_lookup(log=log, settings=settings).get(arm)
if detectorParams["dispersion-axis"] == "x":
axisA = "x"
axisB = "y"
else:
axisA = "y"
axisB = "x"
# UNPACK THE ORDER TABLE
orderTableMeta, orderTablePixels, orderMetaTable = unpack_order_table(
log=log, orderTablePath=orderTablePath, binx=binx, biny=biny, prebinned=True
)
mask = np.ones_like(frame.data)
axisACoords_up = orderTablePixels[f"{axisA}coord_edgeup"].values
axisACoords_low = orderTablePixels[f"{axisA}coord_edgelow"].values
axisBCoords = orderTablePixels[f"{axisB}coord"].values
axisACoords_up = axisACoords_up.astype(int)
axisACoords_low = axisACoords_low.astype(int)
if axisA == "x":
for u, l, y in zip(axisACoords_up, axisACoords_low, axisBCoords):
y = int(y)
l = int(max(0, l))
u = int(min(mask.shape[1], u))
if 0 <= y < mask.shape[0] and l < u:
mask[y, l:u] = 0
else:
for u, l, x in zip(axisACoords_up, axisACoords_low, axisBCoords):
x = int(x)
l = int(max(0, l))
u = int(min(mask.shape[0], u))
if 0 <= x < mask.shape[1] and l < u:
mask[l:u, x] = 0
# COMBINE MASK WITH THE BAD PIXEL MASK
mask = (mask == 1) | (frame.mask == 1)
# PLOT ONE OF THE MASKED FRAMES TO CHECK
maskedFrame = ma.array(frame.data, mask=mask)
quicklook_image(log=log, CCDObject=maskedFrame, show=False, ext=None)
mean = np.ma.mean(maskedFrame)
flux = np.ma.sum(maskedFrame)
utcnow = datetime.utcnow()
utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S")
mean = "%0.*f" % (3, mean)
flux = "%0.*f" % (3, flux)
qcTable = pd.concat(
[
qcTable,
pd.Series(
{
"soxspipe_recipe": recipeName,
"qc_name": "INNER ORDER PIX MEAN",
"qc_value": mean,
"qc_comment": "[e-] Mean inner-order pixel value",
"qc_unit": "electrons",
"obs_date_utc": dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
qcTable = pd.concat(
[
qcTable,
pd.Series(
{
"soxspipe_recipe": recipeName,
"qc_name": "INNER ORDER PIX SUM",
"qc_value": flux,
"qc_comment": "[e-] Sum of all inner-order pixel values",
"qc_unit": "electrons",
"obs_date_utc": dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
log.debug("completed the ``functionName`` function")
return qcTable
[docs]
def get_calibrations_path(log, settings):
"""*return the root path to the static calibrations*
**Key Arguments:**
- `log` -- logger
- ``settings`` -- the settings dictionary
**Usage:**
```python
from soxspipe.commonutils.toolkit import get_calibrations_path
calibrationRootPath = get_calibrations_path(log=log, settings=settings)
```
"""
log.debug("starting the ``get_calibrations_path`` function")
# GENERATE PATH TO STATIC CALIBRATION DATA
if "instrument" in settings:
instrument = settings["instrument"]
else:
instrument = "soxs"
calibrationRootPath = os.path.dirname(os.path.dirname(__file__)) + "/resources/static_calibrations/" + instrument
log.debug("completed the ``get_calibrations_path`` function")
return calibrationRootPath
[docs]
def twoD_disp_map_image_to_dataframe(
log, slit_length, twoDMapPath, kw=False, associatedFrame=False, removeMaskedPixels=False, dispAxis="y"
):
"""*convert the 2D dispersion image map to a pandas dataframe*
**Key Arguments:**
- `log` -- logger
- `twoDMapPath` -- 2D dispersion map image path
- `kw` -- fits keyword lookup dictionary
- `associatedFrame` -- include a flux column in returned dataframe from a frame associated with the dispersion map. Default *False*
- `removeMaskedPixels` -- remove the masked pixels from the associated image? Default *False*
- `dispAxis` -- x or y. Needed for pixel scale calculation
**Usage:**
```python
from soxspipe.commonutils.toolkit import twoD_disp_map_image_to_dataframe
mapDF = twoD_disp_map_image_to_dataframe(log=log, twoDMapPath=twoDMap, associatedFrame=objectFrame, kw=kw)
```
"""
log.debug("starting the ``twoD_disp_map_image_to_dataframe`` function")
import pandas as pd
import numpy as np
from astropy.io import fits
# MAKE RELATIVE HOME PATH ABSOLUTE
from os.path import expanduser
home = expanduser("~")
if twoDMapPath[0] == "~":
twoDMapPath = twoDMapPath.replace("~", home)
binx = 1
biny = 1
# FIND THE APPROPRIATE PREDICTED LINE-LIST
if associatedFrame:
arm = associatedFrame.header[kw("SEQ_ARM")]
if arm != "NIR" and kw("WIN_BINX") in associatedFrame.header:
binx = int(associatedFrame.header[kw("WIN_BINX")])
biny = int(associatedFrame.header[kw("WIN_BINY")])
hdul = fits.open(twoDMapPath)
hdul["WAVELENGTH"].data = hdul["WAVELENGTH"].data.astype("float32")
hdul["SLIT"].data = hdul["SLIT"].data.astype("float32")
hdul["ORDER"].data = hdul["ORDER"].data.astype("float32")
binned = False
if binx > 1 or biny > 1:
binned = True
from astropy.nddata import block_reduce
minimumBinnedPixelValue = hdul["WAVELENGTH"].data.copy()
hdul["WAVELENGTH"].data = block_reduce(hdul["WAVELENGTH"].data, (biny, binx), func=np.mean)
hdul["SLIT"].data = block_reduce(hdul["SLIT"].data, (biny, binx), func=np.mean)
hdul["ORDER"].data = block_reduce(hdul["ORDER"].data, (biny, binx), func=np.mean)
minimumBinnedPixelValue = block_reduce(minimumBinnedPixelValue, (biny, binx), func=np.min)
minimumBinnedPixelValue = minimumBinnedPixelValue.flatten()
# MAKE X, Y ARRAYS TO THEN ASSOCIATE WITH WL, SLIT AND ORDER
xdim = hdul[0].data.shape[1]
ydim = hdul[0].data.shape[0]
xarray = np.tile(np.arange(0, xdim), ydim)
yarray = np.repeat(np.arange(0, ydim), xdim)
if not binned:
minimumBinnedPixelValue = np.ones_like(yarray)
thisDict = {
"x": xarray,
"y": yarray,
"wavelength": hdul["WAVELENGTH"].data.flatten().astype("float32"),
"slit_position": hdul["SLIT"].data.flatten().astype("float32"),
"order": hdul["ORDER"].data.flatten().astype("float32"),
"min": minimumBinnedPixelValue.astype("float32"),
}
if associatedFrame:
thisDict["flux"] = associatedFrame.data.flatten().astype("float32")
thisDict["mask"] = associatedFrame.mask.flatten().astype(bool)
thisDict["error"] = associatedFrame.uncertainty.array.flatten().astype("float32")
# REMOVE IF ABOVE .astype(float) IS WORKING
# try:
# if associatedFrame:
# thisDict["flux"] = associatedFrame.data.flatten()
# thisDict["mask"] = associatedFrame.mask.flatten()
# thisDict["error"] = associatedFrame.uncertainty.array.flatten()
# except Exception as e:
# if binned:
# minimumBinnedPixelValue = minimumBinnedPixelValue.byteswap().newbyteorder()
# thisDict = {
# "x": xarray,
# "y": yarray,
# "wavelength": hdul["WAVELENGTH"].data.flatten().byteswap().newbyteorder(),
# "slit_position": hdul["SLIT"].data.flatten().byteswap().newbyteorder(),
# "order": hdul["ORDER"].data.flatten().byteswap().newbyteorder(),
# "min": minimumBinnedPixelValue
# }
# if associatedFrame:
# thisDict["flux"] = associatedFrame.data.flatten().byteswap().newbyteorder()
# thisDict["mask"] = associatedFrame.mask.flatten().byteswap().newbyteorder()
# thisDict["error"] = associatedFrame.uncertainty.array.flatten().byteswap().newbyteorder()
mapDF = pd.DataFrame.from_dict(thisDict)
if removeMaskedPixels:
mask = mapDF["mask"] == False
mapDF = mapDF.loc[mask]
# REMOVE ZEROS
mask = mapDF["wavelength"] == 0
mapDF = mapDF.loc[~mask]
interOrderMask = hdul["ORDER"].data.copy()
interOrderMask = np.where(interOrderMask > 0, 0, interOrderMask)
interOrderMask = np.where(np.isnan(interOrderMask), 1, interOrderMask)
mapDF.dropna(how="all", subset=["wavelength", "slit_position", "order"], inplace=True)
# REMOVE FILTERED ROWS FROM DATA FRAME
mask = (mapDF["slit_position"] < -slit_length / 2) | (mapDF["slit_position"] > slit_length / 2)
mapDF = mapDF.loc[~mask]
mask = mapDF["min"] == 0
mapDF = mapDF.loc[~mask]
# SORT BY COLUMN NAME
mapDF.sort_values(["wavelength"], inplace=True)
# CALCULATE PIXEL SCALE
if dispAxis == "y":
mapDF.sort_values(["x", "y"], inplace=True)
else:
mapDF.sort_values(["y", "x"], inplace=True)
shiftedWlArray = list(mapDF["wavelength"].values)[1:]
shiftedWlArray.append(np.nan)
mapDF["pixelScale"] = mapDF["wavelength"] - shiftedWlArray
mask = (mapDF["pixelScale"] > 2) | (mapDF["pixelScale"] < -2)
mapDF.loc[mask, "pixelScale"] = 0.0
mapDF["pixelScale"] = mapDF["pixelScale"].abs()
# SORT BY COLUMN NAME
mapDF.sort_values(["wavelength"], inplace=True)
log.debug("completed the ``twoD_disp_map_image_to_dataframe`` function")
return mapDF, interOrderMask
[docs]
def predict_product_path(sofName, recipeName=False):
"""*predict the path of the recipe product from a given SOF name*
**Key Arguments:**
- `log` -- logger,
- `sofName` -- name or full path to the sof file
- ``recipeName`` -- name of the recipe being considered. Default *False*.
**Usage:**
```python
from soxspipe.commonutils import toolkit
productPath, startNightDate = toolkit.predict_product_path(sofFilePath)
```
"""
from astropy.time import Time, TimeDelta
import codecs
startNightDate = False
# TRY AND READ startNightDate FROM RAW FRAME DIRECTORY
# with codecs.open(sofName, encoding="utf-8", mode="r") as readFile:
# thisData = readFile.read()
# for l in thisData.split("\n"):
# if "raw/" in l:
# startNightDate = l.split("raw/")[1].split("/")[0]
# break
try:
sofName = os.path.basename(sofName)
except:
pass
if not recipeName:
recipeName = sys.argv[1]
if recipeName[0] == "-":
recipeName = sys.argv[2]
recipeName = "soxs-" + recipeName
sofName = sofName.replace(".sof", "")
if not startNightDate:
obsDate = sofName.split("_")[0]
startNightDate = ""
try:
obsDate = Time.strptime(obsDate, "%Y%m%dT%H%M%S", scale="utc")
night_start_offset = TimeDelta(15.0 * 60 * 60, format="sec")
startNightDate = obsDate - night_start_offset
startNightDate = startNightDate.strftime("%Y-%m-%d")
except:
print("Could not determine OBSDATE from sof filename")
pass
from soxspipe.commonutils import data_organiser
from fundamentals.logs import emptyLogger
log = emptyLogger()
do = data_organiser(log=log, rootDir=".", dbConnect=False)
currentSession, allSessions = do.session_list(silent=True)
do.close()
if "_STARE" in sofName:
sofName += "_EXTRACTED_MERGED"
if "_NOD" in sofName:
sofName += "_EXTRACTED_MERGED"
productPath = (
f"./sessions/{currentSession}/reduced/{startNightDate}/"
+ recipeName.replace("_", "-")
+ "/"
+ sofName
+ ".fits"
)
productPath = productPath.replace("//", "/")
return productPath, startNightDate
[docs]
def add_recipe_logger(log, productPath):
"""*add a recipe-specific handler to the default logger that writes the recipe's logs adjacent to the recipe project*
**Key Arguments:**
- `log` -- original logger
- `productPath` -- path to the recipe product
**Usage:**
```python
from soxspipe.commonutils.toolkit import add_recipe_logger
log = add_recipe_logger(log, productPath="/path/to/product")
```
"""
import logging
import os
i = 0
while i < 3:
for handler in log.handlers:
if handler.get_name() == "recipeLog":
log.removeHandler(handler)
if handler.get_name() == "recipeErr":
log.removeHandler(handler)
i += 1
# GET THE EXTENSION (WITH DOT PREFIX)
loggingPath = os.path.splitext(productPath)[0] + ".log"
loggingErrorPath = os.path.splitext(productPath)[0] + "_ERROR.log"
try:
os.remove(loggingPath)
os.remove(loggingErrorPath)
except:
pass
# PARENT DIRECTORY PATH NEEDS TO EXIST FOR LOGGER TO WRITE
parentDirectory = os.path.dirname(loggingPath)
if not os.path.exists(parentDirectory):
try:
os.makedirs(parentDirectory)
except:
pass
recipeLog = logging.FileHandler(loggingPath, mode="a", encoding=None, delay=False)
recipeLogFormatter = logging.Formatter("%(message)s")
recipeLog.set_name("recipeLog")
recipeLog.setLevel(logging.INFO + 1)
recipeLog.setFormatter(recipeLogFormatter)
recipeLog.addFilter(MaxFilter(logging.WARNING))
log.addHandler(recipeLog)
recipeErr = logging.FileHandler(loggingErrorPath, mode="a", encoding=None, delay=True)
recipeErrFormatter = logging.Formatter(
'%(asctime)s %(levelname)s: "%(pathname)s", line %(lineno)d, in %(funcName)s > %(message)s', "%Y-%m-%d %H:%M:%S"
)
recipeErr.set_name("recipeErr")
recipeErr.setLevel(logging.ERROR)
recipeErr.setFormatter(recipeErrFormatter)
log.addHandler(recipeErr)
return log
[docs]
class MaxFilter:
def __init__(self, max_level):
self.max_level = max_level
[docs]
def filter(self, record):
if record.levelno < self.max_level:
return True
[docs]
def create_dispersion_solution_grid_lines_for_plot(
log, dispMap, dispMapImage, associatedFrame, kw, skylines=False, slitPositions=False, slit_length=11
):
"""*given a dispersion solution and accompanying 2D dispersion map image, generate the grid lines to add to QC plots*
**Key Arguments:**
- `log` -- logger
- ``dispMap`` -- path to dispersion map. Default *False*
- ``dispMapImage`` -- the 2D dispersion map image
- `associatedFrame` -- a frame associated with the reduction (to read arm and binning info).
- `kw` -- fits header kw dictionary
- `skylines` -- a list of skylines to use as the grid. Default *False*
- `slitPositions` -- slit positions to plot (else plot min and max)
- `slit_length` -- length of the slit to use for the dispersion map dataframe (default 11)
**Returns:**
- `orderPixelTable` -- DataFrame containing the pixel coordinates for grid lines to plot.
- `interOrderMask` -- Mask array indicating inter-order regions.
**Usage:**
```python
from soxspipe.commonutils.toolkit import create_dispersion_solution_grid_lines_for_plot
gridLinePixelTable, interOrderMask = create_dispersion_solution_grid_lines_for_plot(
log=log,
dispMap=dispMap,
dispMapImage=dispMapImage,
associatedFrame=CCDObject,
kw=kw,
skylines=skylines
)
for l in range(int(gridLinePixelTable['line'].max())):
mask = (gridLinePixelTable['line'] == l)
ax.plot(gridLinePixelTable.loc[mask]["fit_y"], gridLinePixelTable.loc[mask]["fit_x"], "w-", linewidth=0.5, alpha=0.8, color="black")
```
"""
log.debug("starting the ``create_dispersion_solution_grid_lines_for_plot`` function")
import numpy as np
import pandas as pd
dispMapDF, interOrderMask = twoD_disp_map_image_to_dataframe(
log=log, slit_length=slit_length, twoDMapPath=dispMapImage, associatedFrame=associatedFrame, kw=kw
)
uniqueOrders = dispMapDF["order"].unique()
wlLims = []
sPos = []
for o in uniqueOrders:
filDF = dispMapDF.loc[dispMapDF["order"] == o]
wlRange = filDF["wavelength"].max() - filDF["wavelength"].min()
wlLims.append((filDF["wavelength"].min(), filDF["wavelength"].max()))
if isinstance(slitPositions, bool):
sPos.append((filDF["slit_position"].min(), filDF["slit_position"].max()))
else:
sPos.append(slitPositions)
lineNumber = 0
orderPixelTable_list = []
for o, wlLim, spLim in zip(uniqueOrders, wlLims, sPos):
wlRange = np.arange(wlLim[0], wlLim[1], 1)
wlRange = np.append(wlRange, [wlLim[1]])
for e in spLim:
myDict = {
"line": np.full_like(wlRange, lineNumber),
"order": np.full_like(wlRange, o),
"wavelength": wlRange,
"slit_position": np.full_like(wlRange, e),
}
orderPixelTable_list.append(pd.DataFrame(myDict))
lineNumber += 1
spRange = np.arange(min(spLim), max(spLim), 1)
spRange = np.append(spRange, [max(spLim)])
if not isinstance(skylines, bool):
mask = skylines["WAVELENGTH"].between(wlLim[0], wlLim[1])
wlRange = skylines.loc[mask]["WAVELENGTH"].values
else:
step = max(1, int((wlLim[1] - wlLim[0]) // 400))
wlRange = np.arange(wlLim[0], wlLim[1], step)
wlRange = np.append(wlRange, [wlLim[1]])
for l in wlRange:
myDict = {
"line": np.full_like(spRange, lineNumber),
"order": np.full_like(spRange, o),
"wavelength": np.full_like(spRange, l),
"slit_position": spRange,
}
orderPixelTable_list.append(pd.DataFrame(myDict))
lineNumber += 1
orderPixelTable = pd.concat(orderPixelTable_list, ignore_index=True)
orderPixelTable = dispersion_map_to_pixel_arrays(
log=log, dispersionMapPath=dispMap, orderPixelTable=orderPixelTable
)
log.debug("completed the ``create_dispersion_solution_grid_lines_for_plot`` function")
return orderPixelTable, interOrderMask
[docs]
def get_calibration_lamp(log, frame, kw):
"""*given a frame, determine which calibration lamp is being used*
**Key Arguments:**
- `log` -- logger
- `frame` -- the frame to determine the calibration lamp for
- `kw` -- the FITS header keyword dictionary
**Usage:**
```python
from soxspipe.commonutils.toolkit import get_calibration_lamp
lamp = get_calibration_lamp(log=log, frame=frame, kw=kw)
```
"""
log.debug("starting the ``read_calibration_lamp`` function")
inst = frame.header["INSTRUME"]
lamp = None
for l in [kw("LAMP1"), kw("LAMP2"), kw("LAMP3"), kw("LAMP4"), kw("LAMP5"), kw("LAMP6"), kw("LAMP7")]:
if l in frame.header:
newLamp = frame.header[l]
newLamp = (
newLamp.replace("UVB_High", "QTH")
.replace("UVB_Low_", "")
.replace("NIR_", "")
.replace("VIS_", "")
.replace("UVB_", "")
.replace("_lamp", "")
.replace("_Lamp", "")
.replace("Argo", "Ar")
.replace("Neon", "Ne")
.replace("Merc", "Hg")
.replace("Xeno", "Xe")
)
if lamp:
lamp += newLamp
else:
lamp = newLamp
log.debug("completed the ``read_calibration_lamp`` function")
return lamp
[docs]
def qc_settings_plot_tables(log, qc, qcAx, settings, settingsAx):
"""*generate QC and settings table to be placed at the bottom of the QC plots*
**Key Arguments:**
- `log` -- logger
- `qc` -- date frame of collected QCs
- `qcAx` -- the axis to add the QC table to
- `settings` -- settings to report in settings table
- `settingsAx` -- the axis to add the settings table to
**Usage:**
```python
from soxspipe.commonutils.toolkit import qc_settings_plot_tables
qc_settings_plot_tables(log=log,qc=self.qc,qcAx=qcAx, settings=settings,settingsAx=settingsAx)
```
"""
log.debug("starting the ``qc_settings_plot_tables`` function")
import matplotlib as plt
import numpy as np
import pandas as pd
tables = []
cols = []
qcCopy = qc.copy()
if "qc_order" in qcCopy.columns:
qcCopy = qcCopy.loc[((qcCopy["qc_order"] == "-1") | (qcCopy["qc_order"].isna()) | (qcCopy["qc_order"] == -1))]
qcCopy["value"] = qcCopy["qc_value"].astype(str) + " " + qcCopy["qc_unit"]
qcCopy.loc[qcCopy["value"].isnull(), "value"] = qcCopy.loc[qcCopy["value"].isnull(), "qc_value"]
if len(qcCopy.index) > 10:
qcCopy = qcCopy.head(10)
columns1 = ["value", "qc_comment"]
colColours = plt.cm.Greys(np.full(len(columns1), 0.1))
rowColours = plt.cm.Greys(np.full(len(qcCopy.index), 0.1))
rowLabels = qcCopy["qc_name"].values
if len(qcCopy[columns1].values):
qcTable = qcAx.table(
cellText=qcCopy[columns1].values,
colLabels=columns1,
loc="center",
cellLoc="left",
rowColours=rowColours,
colColours=colColours,
rowLabels=rowLabels,
rowLoc="right",
fontsize=14,
)
tables.append(qcTable)
cols.append(columns1)
# qcAx.set_title(
# "QC Table", fontsize=9)
settingsCopy = {k: v for k, v in settings.items() if k not in ["nir", "vis", "uvb", "qc-acceptable-ranges"]}
settingsCopy = {"setting": settingsCopy.keys(), "value": settingsCopy.values()}
settingsDF = pd.DataFrame(settingsCopy)
columns2 = ["value"]
colColours = plt.cm.Greys(np.full(len(columns2), 0.1))
rowColours = plt.cm.Greys(np.full(len(settingsDF.index), 0.1))
rowLabels = settingsDF["setting"].values
settingsTable = settingsAx.table(
cellText=settingsDF[columns2].values,
colLabels=columns2,
loc="center",
cellLoc="left",
rowColours=rowColours,
colColours=colColours,
rowLabels=rowLabels,
rowLoc="right",
fontsize=14,
)
tables.append(settingsTable)
cols.append(columns2)
# settingsAx.set_title(
# "Parameters", fontsize=9, loc='left')
settingsAx.margins(x=0, y=0)
for t, c in zip(tables, cols):
t.scale(1, 1.5)
t.auto_set_font_size(False)
t.set_fontsize(4)
table_cells = t.properties()["children"]
for cell in table_cells:
cell.set_linewidth(0.3)
t.auto_set_column_width(list(range(len(c))))
for a in [qcAx, settingsAx]:
# Hide axes
a.get_xaxis().set_visible(False)
a.get_yaxis().set_visible(False)
a.axis("off")
log.debug("completed the ``qc_settings_plot_tables`` function")
return None
[docs]
def utility_setup(log, settings, recipeName, startNightDate):
"""*setup some tools needed by most soxspipe utils*
**Key Arguments:**
- `log` -- logger
- `settings` -- the settings dictionary
- `recipeName` -- name of the recipe as it appears in the settings dictionary
- `startNightDate` -- YYYY-MM-DD date of the observation night. Default ""
**Return:**
- `qcDir` -- the QC directory (created if missing)
- `productDir` -- the product directory (created if missing)
**Usage:**
```python
# Example usage with timezone-aware UTC datetime
from datetime import datetime, UTC
startNightDate = datetime.now(UTC).strftime("%Y-%m-%d")
qcDir, productDir = utility_setup(log=log, settings=settings, recipeName="my_recipe", startNightDate=startNightDate)
```
"""
log.debug("starting the ``utility_setup`` function")
from os.path import expanduser
home = expanduser("~")
recipeName = recipeName.replace("-obj", "")
# QC DIR
qcDir = settings["workspace-root-dir"].replace("~", home) + f"/qc/{startNightDate}/{recipeName}/"
qcDir = qcDir.replace("//", "/")
# RECURSIVELY CREATE MISSING DIRECTORIES
if not os.path.exists(qcDir):
try:
os.makedirs(qcDir)
except Exception as e:
pass
# PRODUCT DIR
productDir = settings["workspace-root-dir"].replace("~", home) + f"/reduced/{startNightDate}/{recipeName}/"
productDir = productDir.replace("//", "/")
# RECURSIVELY CREATE MISSING DIRECTORIES
if not os.path.exists(productDir):
try:
os.makedirs(productDir)
except Exception as e:
pass
log.debug("completed the ``utility_setup`` function")
return qcDir, productDir
[docs]
def plot_merged_spectrum_qc(
merged_orders,
products,
log,
qcDir,
filenameTemplate,
noddingSequence,
dateObs,
arm,
recipeName,
orderJoins=False,
debug=False,
fluxCalibrated=False,
qcTable=False,
settings=False,
):
"""
Plot merged spectrum QC plot as a standalone function.
Returns:
products (pd.DataFrame): Updated products table.
filePath (str): Path to the saved QC plot PDF.
"""
log.debug("starting the ``plot_merged_spectrum_qc`` function")
# DETECTOR PARAMETERS LOOKUP OBJECT
from soxspipe.commonutils import detector_lookup
# DO NOT PLOT IF PRODUCT TABLE HAS NOT BEEN PASSED
if isinstance(products, bool) and not products:
return products, None
import matplotlib.pyplot as plt
from datetime import datetime
import pandas as pd
from astropy import units as u
from astropy.stats import sigma_clipped_stats
import numpy as np
if not noddingSequence:
noddingSequence = ""
# DETECTOR PARAMETERS LOOKUP OBJECT
dp = detector_lookup(log=log, settings=settings).get(arm)
# GET SKYLINE DATA
calibrationRootPath = get_calibrations_path(log=log, settings=settings)
skylines = calibrationRootPath + "/" + dp["skylines"]
# SPEC FORMAT TO PANDAS DATAFRAME
from astropy.table import Table
dat = Table.read(skylines, format="fits")
skylinesDF = dat.to_pandas()
# FILTER TO STRONG SKY LINES ONLY FOR PLOTTING
skylinesDF["FLUX"] = skylinesDF["FLUX"].astype(float)
if arm == "VIS":
mask = skylinesDF["FLUX"] > 5
else:
mask = skylinesDF["FLUX"] > 500
skylinesDF = skylinesDF.loc[mask]
fig = plt.figure(figsize=(14, 10), constrained_layout=True, dpi=180)
# Adjusted height ratios
gs = fig.add_gridspec(5, 1, height_ratios=[3, 1, 1, 1, 0])
# Top panel with linear scale
top_panel = fig.add_subplot(gs[0, :])
if fluxCalibrated:
top_panel.set_ylabel("flux (erg s$^{-1}$ cm$^{-2}$ $\\AA^{-1}$)", fontsize=10)
else:
top_panel.set_ylabel("flux ($e^{-}$)", fontsize=10)
top_panel.set_title(
f"Optimally Extracted Order-Merged Object Spectrum ({arm.upper()})\n{filenameTemplate.replace('.fits', '')}",
fontsize=11,
linespacing=2.0,
)
top_panel.plot(
merged_orders["WAVE"],
merged_orders["FLUX_COUNTS"],
linewidth=0.3,
color="#dc322f" if not fluxCalibrated else "#2aa198",
zorder=1,
)
try:
top_panel.set_xlim(merged_orders["WAVE"].min().value, merged_orders["WAVE"].max().value)
except Exception:
top_panel.set_xlim(merged_orders["WAVE"].min(), merged_orders["WAVE"].max())
# Middle panel with log scale
middle_panel = fig.add_subplot(gs[1, :])
if not fluxCalibrated:
middle_panel.set_ylabel("flux ($e^{-}$)", fontsize=10)
else:
middle_panel.set_ylabel("flux (erg s$^{-1}$ cm$^{-2}$ $\\AA^{-1}$)", fontsize=10)
middle_panel.set_xlabel("wavelength (nm)", fontsize=10)
middle_panel.set_yscale("log")
middle_panel.plot(
merged_orders["WAVE"],
merged_orders["FLUX_COUNTS"],
linewidth=0.3,
color="#dc322f" if not fluxCalibrated else "#2aa198",
zorder=1,
)
from astropy.stats import sigma_clip, mad_std
# SIGMA-CLIP THE DATA
arrayMask = sigma_clip(
merged_orders["FLUX_COUNTS"], sigma_lower=3, sigma_upper=15.0, maxiters=1, cenfunc="mean", stdfunc="std"
)
mean, median, std = sigma_clipped_stats(
merged_orders["FLUX_COUNTS"], sigma=5.0, stdfunc="std", cenfunc="mean", maxiters=3
)
maxFlux = arrayMask.max() + 3 * std
minFlux = arrayMask.min() - 3 * std
top_panel.set_ylim(minFlux, maxFlux)
middle_panel.set_ylim(max(arrayMask.min() * 0.5, 0), arrayMask.max() * 2)
try:
middle_panel.set_xlim(merged_orders["WAVE"].min().value, merged_orders["WAVE"].max().value)
except Exception:
middle_panel.set_xlim(merged_orders["WAVE"].min(), merged_orders["WAVE"].max())
# Bottom panel with linear scale for SNR
bottom_panel = fig.add_subplot(gs[2, :])
bottom_panel.set_ylabel("SNR", fontsize=10)
bottom_panel.set_xlabel("wavelength (nm)", fontsize=10)
if not isinstance(qcTable, bool):
qcTable = qcTable.drop_duplicates(subset=["qc_name", "qc_order"], keep="last")
snrValue = qcTable.loc[qcTable["qc_name"] == "SNR MEDIAN"]
orderValue = snrValue["qc_order"].values
for i in range(len(orderValue)):
try:
orderValue[i] = int(orderValue[i])
except (ValueError, TypeError):
pass
orderValue = np.array(["GLOBAL" if pd.isna(v) else v for v in orderValue])
snrValue = snrValue["qc_value"].values
bottom_panel.plot(merged_orders["WAVE"], merged_orders["SNR"], linewidth=0.4, color="black", zorder=1)
try:
bottom_panel.set_xlim(merged_orders["WAVE"].min().value, merged_orders["WAVE"].max().value)
except Exception:
bottom_panel.set_xlim(merged_orders["WAVE"].min(), merged_orders["WAVE"].max())
mean, median, std = sigma_clipped_stats(merged_orders["SNR"], sigma=5.0, stdfunc="std", cenfunc="mean", maxiters=3)
bottom_panel.set_ylim(0, mean + 4 * std)
# ADD SNR VALUES TO BOTTOM PANEL
if not isinstance(qcTable, bool) and len(orderValue):
_vis_rank = {"GLOBAL": 0, "u": 1, "g": 2, "r": 3, "i": 4}
def _order_key(pair):
o = pair[0]
if o in _vis_rank:
return (0, _vis_rank[o])
try:
return (1, float(o))
except (ValueError, TypeError):
return (2, str(o))
pairs = sorted(zip(orderValue, snrValue), key=_order_key)
snr_text = "\n".join(f"{o}: {v:.0f}" for o, v in pairs)
bottom_panel.text(
0.99,
0.98,
snr_text,
transform=bottom_panel.transAxes,
ha="right",
va="top",
fontsize=5,
family="monospace",
color="black",
zorder=1,
)
# SKY PANEL WITH SKY FLUX
sky_panel = fig.add_subplot(gs[3, :])
sky_panel.set_xlabel("wavelength (nm)", fontsize=10)
sky_panel.set_ylabel("sky flux ($e^{-}$)", fontsize=10)
sky_panel.set_yscale("log")
if "SKY_COUNTS" in merged_orders.columns:
sky_panel.plot(
merged_orders["WAVE"],
merged_orders["SKY_COUNTS"],
linewidth=0.3,
color="#859900" if not fluxCalibrated else "#2aa198",
zorder=1,
)
sky_panel.set_ylim(10, merged_orders["SKY_COUNTS"].max() * 1.1)
try:
sky_panel.set_xlim(merged_orders["WAVE"].min().value, merged_orders["WAVE"].max().value)
except Exception:
sky_panel.set_xlim(merged_orders["WAVE"].min(), merged_orders["WAVE"].max())
if orderJoins:
for k, v in orderJoins.items():
for panel in [top_panel, middle_panel, bottom_panel]:
panel.axvline(v, color="black", linestyle="--", linewidth=0.5, alpha=0.5)
panel.text(
v + 5,
0.9 * panel.get_ylim()[1],
"ORDER JOIN",
rotation=90,
verticalalignment="top",
fontsize=6,
color="black",
alpha=0.5,
)
# PLOT SKY LINES AS VERTICAL LINES ON SKY PANEL
for _, row in skylinesDF.iterrows():
for panel in [top_panel, middle_panel, bottom_panel, sky_panel]:
panel.axvline(
row["WAVELENGTH"],
color="blue",
linestyle="-",
linewidth=0.2,
alpha=0.08,
zorder=0,
label="Sky Line" if panel == top_panel else "",
)
if fluxCalibrated:
filename = filenameTemplate.replace(".fits", f"_EXTRACTED_MERGED_FLUXCALIBRATED_QC_PLOT{noddingSequence}.pdf")
else:
filename = filenameTemplate.replace(".fits", f"_EXTRACTED_MERGED_QC_PLOT{noddingSequence}.pdf")
filePath = f"{qcDir}/{filename}"
if debug:
plt.show()
plt.savefig(filePath, dpi=120, bbox_inches="tight", format="pdf")
plt.close("all")
utcnow = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S")
products = pd.concat(
[
products,
pd.Series(
{
"soxspipe_recipe": recipeName,
"product_label": (
f"EXTRACTED_MERGED_FLUXCALIBRATED_QC_PLOT{noddingSequence}"
if fluxCalibrated
else f"EXTRACTED_MERGED_QC_PLOT{noddingSequence}"
),
"file_name": filename,
"file_type": "PDF",
"obs_date_utc": dateObs,
"reduction_date_utc": utcnow,
"product_desc": "QC plot of extracted order-merged source",
"file_path": filePath,
"label": "QC",
}
)
.to_frame()
.T,
],
ignore_index=True,
)
log.debug("completed the ``plot_merged_spectrum_qc`` function")
return products, filePath
[docs]
def calculate_rolling_snr(dataframe, flux_column, window_size):
"""
Calculate the rolling Signal-to-Noise Ratio (SNR) for a given column in a pandas DataFrame.
This function computes the rolling SNR for a specified column in the DataFrame using a custom
rolling window function. The SNR is calculated as the ratio of the median signal to the noise,
where the noise is estimated using a robust statistical method.
**Key Arguments:**
- `dataframe`: The input pandas DataFrame containing the data.
- `flux_column`: The name of the column in the DataFrame for which the rolling SNR
will be calculated.
- `window_size`: The size of the rolling window to use for the calculation.
**Return:**
- `dataframe`: The input DataFrame with an additional column 'SNR' containing the
calculated rolling SNR values.
**Usage:**
```python
from soxspipe.commonutils.toolkit import calculate_rolling_snr
df_with_snr = calculate_rolling_snr(dataframe=df, flux_column='flux', window_size=5)
```
"""
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
values = dataframe[flux_column].to_numpy(dtype=np.float64, copy=False)
n = values.size
snr_full = np.full(n, np.nan, dtype=np.float64)
# Need at least 5 points for the noise estimator and enough data for one window
if window_size >= 5 and n >= window_size:
windows = sliding_window_view(values, window_shape=window_size)
# Signal: rolling median
signal = np.median(windows, axis=1)
# Noise: robust estimator based on 5-point second-difference pattern
diff = np.abs(2.0 * windows[:, 2:-2] - windows[:, :-4] - windows[:, 4:])
noise = 0.6052697 * np.median(diff, axis=1)
snr = np.divide(signal, noise, out=np.full_like(signal, np.nan), where=noise > 0)
# Match center=True placement
left = (window_size - 1) // 2
snr_full[left : left + snr.size] = snr
dataframe["SNR"] = snr_full
dataframe["SNR"] = dataframe["SNR"].replace([np.inf, -np.inf], np.nan).bfill().ffill()
return dataframe
[docs]
def extinction_correction_factor(wave, extinctionTablePath, airmass):
from scipy.interpolate import interp1d
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from astropy.table import Table
# READ THE EXTINCTION CURVE FOR THE OBSERVATORY
# DATA IS ORGANIZED AS FOLLOWS:
# FIRST COLUMN, WAVELENGTH (IN ANGSTROM), SECOND COLUMN MAG/AIRMASS
extinctionData = Table.read(extinctionTablePath, format="fits")
extinctionData = extinctionData.to_pandas()
# CONVERT ANG TO NM
wave_ext = extinctionData["WAVE"] / 10
# INTERPOLATING ON THE REQUIRED WAVE SCALE
refitted_ext = interp1d(
np.array(wave_ext), np.array(extinctionData["MAG_AIRMASS"]), kind="next", fill_value="extrapolate"
)
extCorrectionFactor = 10 ** (0.4 * refitted_ext(wave) * airmass)
return extCorrectionFactor
[docs]
def frame_to_32(frame):
"""*convert a given frame to 32-bit float format*
**Key Arguments:**
- `frame` -- the input frame
**Returns:**
- `frame` -- the converted frame
**Usage:**
```python
from soxspipe.commonutils.toolkit import frame_to_32
frame = frame_to_32(frame)
```
"""
from astropy.nddata import CCDData
import numpy as np
try:
if frame.data.dtype != np.float32:
frame.data = frame.data.astype(np.float32, copy=False)
except:
pass
try:
frame.uncertainty.array = frame.uncertainty.array.astype(np.float32, copy=False)
except:
pass
return frame
[docs]
def add_snr_efficiency_qcs(log, spectrumDF, qcTable, orderJoins, recipeName, dateObs):
"""*add quality checks to the qc table*
**Key Arguments:**
- ``log`` -- logger
- ``spectrumDF`` -- the dataframe containing the extracted spectrum with a column named 'SNR' or 'EFFICIENCY' to calculate the checks from.
- ``qcTable`` -- the qc table to which the SNR checks will be added
- ``orderJoins`` -- a dictionary containing the wavelengths of order joins (if any) to be added as QCs.
- ``recipeName`` -- name of the recipe to add to the QC entries
- ``dateObs`` -- the observation date to add to the QC entries (in ISO format, e.g., "2024-06-01T00:00:00")
**Return:**
- ``qcTable`` -- the updated qc table with SNR checks added
**Usage:**
```python
qcTable = add_snr_qcs(log, spectrumDF, qcTable, orderJoins)
```
"""
import numpy as np
import pandas as pd
from datetime import datetime
spectrumDF = spectrumDF.copy(deep=False)
spectrumDF["ORDER"] = np.nan
try:
spectrumDF["WAVE"] = spectrumDF["WAVE"].values.value
except:
pass
## REVERSE DICTIONARY KEYS SO FIRST KEY IS LAST
orderJoins = dict(reversed(list(orderJoins.items())))
for i, (orders, wl) in enumerate(zip(orderJoins.keys(), orderJoins.values())):
if len(orders) == 2:
visOrders = ["i", "r", "g", "u"]
lastOrder = visOrders[int(orders[0])]
order = visOrders[int(orders[0]) - 1]
elif len(orders) == 3:
lastOrder = int(orders[:1])
order = int(orders[1:])
else:
order = int(orders[:2])
lastOrder = int(orders[2::])
if i == 0:
spectrumDF.loc[(spectrumDF["WAVE"] <= wl), "ORDER"] = lastOrder
spectrumDF.loc[
spectrumDF["WAVE"] >= wl,
"ORDER",
] = order
utcnow = datetime.utcnow()
utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S")
# CALCULATE THE MEDIAN EFFICIENCY ACROSS ALL ORDERS
if "EFFICIENCY" in spectrumDF.columns:
medianEfficiency = spectrumDF["EFFICIENCY"].median()
qcTable = pd.concat(
[
qcTable,
pd.Series(
{
"soxspipe_recipe": recipeName,
"qc_name": "EFF MEDIAN",
"qc_value": float(f"{medianEfficiency:0.2f}"),
"qc_comment": "Median efficiency across all orders",
"qc_unit": None,
"obs_date_utc": dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
# CALCULATE THE MEDIAN EFFICIENCY IN EACH ORDER
orderEfficiency = spectrumDF.groupby("ORDER")["EFFICIENCY"].median().reset_index()
orderEfficiency.columns = ["ORDER", "MEDIAN_EFFICIENCY"]
for _, row in orderEfficiency.iterrows():
qcTable = pd.concat(
[
qcTable,
pd.Series(
{
"soxspipe_recipe": recipeName,
"qc_name": f"EFF MEDIAN",
"qc_value": float(f"{row['MEDIAN_EFFICIENCY']:0.2f}"),
"qc_comment": f"Median efficiency in order {row['ORDER']}",
"qc_order": row["ORDER"],
"qc_unit": None,
"obs_date_utc": dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
if "SNR" in spectrumDF.columns:
medianSNR = spectrumDF["SNR"].median()
qcTable = pd.concat(
[
qcTable,
pd.Series(
{
"soxspipe_recipe": recipeName,
"qc_name": "SNR MEDIAN",
"qc_value": float(f"{medianSNR:0.2f}"),
"qc_comment": "Median SNR across all orders",
"qc_unit": None,
"obs_date_utc": dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
orderSNR = spectrumDF.groupby("ORDER")["SNR"].median().reset_index()
orderSNR.columns = ["ORDER", "MEDIAN_SNR"]
for _, row in orderSNR.iterrows():
qcTable = pd.concat(
[
qcTable,
pd.Series(
{
"soxspipe_recipe": recipeName,
"qc_name": f"SNR MEDIAN",
"qc_value": float(f"{row['MEDIAN_SNR']:0.2f}"),
"qc_comment": f"Median SNR in order {row['ORDER']}",
"qc_order": row["ORDER"],
"qc_unit": None,
"obs_date_utc": dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
return qcTable