Source code for soxspipe.commonutils.toolkit

#!/usr/bin/env python
# encoding: utf-8
"""
*small reusable functions used throughout soxspipe*

Author
: David Young

Date Created
: September 18, 2020
"""

from os.path import expanduser
from soxspipe.commonutils import detector_lookup
from datetime import datetime, UTC
from soxspipe.commonutils import keyword_lookup
from soxspipe.commonutils.polynomials import chebyshev_xy_polynomial, chebyshev_order_xy_polynomials
from fundamentals import tools
from builtins import object
import sys
from soxspipe.commonutils.dispersion_map_to_pixel_arrays import dispersion_map_to_pixel_arrays
import os
from line_profiler import profile

os.environ["TERM"] = "vt100"


[docs] def cut_image_slice(log, frame, width, length, x, y, sliceAxis="x", median=False, debug=False): """*cut and return an N-pixel wide and M-pixels long slice, centred on a given coordinate from an image frame* **Key Arguments:** - ``log`` -- logger - ``frame`` -- the data array to cut the slice from (masked array) - ``width`` -- width of the slice (odd number) - ``length`` -- length of the slice - ``x`` -- x-coordinate - ``y`` -- y-coordinate - ``sliceAxis`` -- the axis along which slice is to be taken. Default *x* - ``median`` -- collapse the slice to a median value across its width - ``debug`` -- generate a plot of slice. Useful for debugging. **Usage:** ```python from soxspipe.commonutils.toolkit import cut_image_slice slice = cut_image_slice(log=self.log, frame=self.pinholeFlat.data, width=1, length=sliceLength, x=x_fit, y=y_fit, plot=False) if slice is None: return None ``` """ log.debug("starting the ``cut_image_slice`` function") import numpy.ma as ma import numpy as np import random halfSlice = length / 2 # NEED AN EVEN PIXEL SIZE if (width % 2) != 0: halfwidth = (width - 1) / 2 else: halfwidth = width / 2 if sliceAxis == "x": axisA = x axisB = y axisALen = frame.shape[1] axisBLen = frame.shape[0] elif sliceAxis == "y": axisB = x axisA = y axisALen = frame.shape[0] axisBLen = frame.shape[1] else: raise ValueError("sliceAxis needs to be either 'x' or 'y'") # CHECK WE ARE NOT GOING BEYOND BOUNDS OF FRAME if (axisA > axisALen - halfSlice) or (axisB > axisBLen - halfwidth) or (axisA < halfSlice) or (axisB < halfwidth): return None, None, None slice_length_offset = int(axisA - halfSlice) if sliceAxis == "x": sliceFull = frame[ int(axisB - halfwidth) : int(axisB + halfwidth + 1), slice_length_offset : int(axisA + halfSlice) ] else: sliceFull = frame[ slice_length_offset : int(axisA + halfSlice), int(axisB - halfwidth) : int(axisB + halfwidth + 1) ] slice_width_centre = (int(axisB + halfwidth + 1) + int(axisB - halfwidth)) / 2 # # FORCE CONVERSION OF CCDData OBJECT TO NUMPY ARRAY # maskedDataArray = np.ma.array(sliceFull.data, mask=sliceFull.mask) # try: # sliceFull.data=sliceFull.data - np.percentile(maskedDataArray, 30) # except: # pass if median: if sliceAxis == "y": slice = ma.median(sliceFull, axis=1) else: slice = ma.median(sliceFull, axis=0) if False and debug and random.randint(1, 101) < 5: import matplotlib.pyplot as plt # CHECK THE SLICE POINTS IF NEEDED if sliceAxis == "y": sliceImg = np.rot90(sliceFull, 1) else: sliceImg = sliceFull plt.imshow(sliceImg) plt.show() xx = np.arange(0, len(slice)) plt.figure(figsize=(8, 5)) if sliceAxis == "y": plt.plot(xx, slice, "ko", label=f"x={axisB}, y={axisA}, sliceAxis={sliceAxis}") if sliceAxis == "x": plt.plot(xx, slice, "ko", label=f"x={axisA}, y={axisB}, sliceAxis={sliceAxis}") plt.xlabel("Position") plt.ylabel("Flux") plt.legend() plt.show() log.debug("completed the ``cut_image_slice`` function") return slice, slice_length_offset, slice_width_centre
[docs] def quicklook_image( log, CCDObject, show=True, ext="data", stdWindow=3, title=False, surfacePlot=False, dispMap=False, dispMapImage=False, inst=False, settings=False, skylines=False, saveToPath=False, ): """*generate a quicklook image of a CCDObject - useful for development/debugging* **Key Arguments:** - ``log`` -- logger - ``CCDObject`` -- the CCDObject to plot - ``show`` -- show the image. Set to False to skip - ``ext`` -- the name of the the extension to show. Can be "data", "mask" or "err". Default "data". - ``title`` -- give a title for the plot - ``surfacePlot`` -- plot as a 3D surface plot - ``dispMap`` -- path to dispersion map. Default *False* - ``dispMapImage`` -- the 2D dispersion map image - ``inst`` -- provide instrument name if no header exists - ``skylines`` -- mark skylines on image ```python from soxspipe.commonutils.toolkit import quicklook_image quicklook_image( log=self.log, CCDObject=myframe, show=True) ``` """ log.debug("starting the ``quicklook_image`` function") if not show and not saveToPath: return import pandas as pd import matplotlib as mpl import numpy as np from copy import copy from soxspipe.commonutils.toolkit import twoD_disp_map_image_to_dataframe from soxspipe.commonutils import keyword_lookup from soxspipe.commonutils import detector_lookup originalRC = dict(mpl.rcParams) import matplotlib.pyplot as plt if settings: # KEYWORD LOOKUP OBJECT - LOOKUP KEYWORD FROM DICTIONARY IN RESOURCES # FOLDER kw = keyword_lookup(log=log, settings=settings).get arm = CCDObject.header[kw("SEQ_ARM")] dateObs = CCDObject.header[kw("DATE_OBS")] # DETECTOR PARAMETERS LOOKUP OBJECT detectorParams = detector_lookup(log=log, settings=settings).get(arm) # USE THIS ELSEWHERE IN THE OBJECT METHODS dp = detectorParams science_pixels = dp["science-pixels"] if ext == "data": frame = CCDObject.data elif ext == "mask": frame = CCDObject.mask elif ext == "uncertainty": frame = CCDObject.uncertainty.array else: # ASSUME ONLY NDARRAY frame = CCDObject if inst is False: try: inst = CCDObject.header["INSTRUME"] except: inst = "XSHOOTER" if skylines: calibrationRootPath = get_calibrations_path(log=log, settings=settings) skylines = calibrationRootPath + "/" + dp["skylines"] # SPEC FORMAT TO PANDAS DATAFRAME from astropy.table import Table dat = Table.read(skylines, format="fits") skylinesDF = dat.to_pandas() else: skylinesDF = False # COMBINE MASK WITH THE BAD PIXEL MASK if not isinstance(dispMapImage, bool): gridLinePixelTable, interOrderMask = create_dispersion_solution_grid_lines_for_plot( log=log, dispMap=dispMap, dispMapImage=dispMapImage, associatedFrame=CCDObject, kw=kw, skylines=skylinesDF ) try: mask = (frame.mask == 1) | (interOrderMask == 1) except: mask = interOrderMask == 1 try: frame.mask = mask except: pass if inst == "SOXS": rotatedImg = np.flipud(frame) elif inst == "XSHOOTER": rotatedImg = np.rot90(frame, 1) else: rotatedImg = frame rotatedImg = np.flipud(rotatedImg) from astropy.stats import sigma_clipped_stats mean, median, std = sigma_clipped_stats(frame, sigma=50.0, stdfunc="mad_std", cenfunc="median", maxiters=3) # std = np.nanstd(frame) # mean = np.nanmean(frame) # median = np.nanmean(frame) palette = copy(plt.cm.viridis) palette.set_bad("#dc322f", 1.0) vmax = median + stdWindow * 0.5 * std vmin = median - stdWindow * 0.5 * std if surfacePlot: from matplotlib import rc axisColour = "#002b36" rc("axes", edgecolor=axisColour, labelcolor=axisColour, linewidth=0.6) rc("xtick", color=axisColour) rc("ytick", color=axisColour) rc("grid", color=axisColour) rc("text", color=axisColour) fig = plt.figure(figsize=(20, 8)) ax = fig.add_subplot(121, projection="3d") if inst == "XSHOOTER": plt.gca().invert_yaxis() ax.set_box_aspect(aspect=(2, 1, 1)) # Remove gray panes and axis grid ax.xaxis.pane.fill = False ax.zaxis.pane.set_facecolor("#dc322f") ax.zaxis.pane.set_alpha(1.0) ax.yaxis.pane.fill = False ax.grid(False) # Remove z-axis # ax.w_zaxis.line.set_lw(0.) # ax.set_zticks([]) X, Y = np.meshgrid( np.linspace(0, rotatedImg.shape[1], rotatedImg.shape[1]), np.linspace(0, rotatedImg.shape[0], rotatedImg.shape[0]), ) surface = ax.plot_surface(X=X, Y=Y, Z=rotatedImg, cmap="viridis", antialiased=True, vmin=vmin, vmax=vmax) if inst == "SOXS": ax.azim = 70 else: ax.azim = -120 ax.elev = 30 ax.set_xlim(0, rotatedImg.shape[1]) ax.set_ylim(0, rotatedImg.shape[0]) ax.set_zlim(vmin, min(np.nanmax(frame), vmax * 1.2)) if inst == "SOXS": ax.invert_yaxis() backgroundColour = "white" fig.set_facecolor(backgroundColour) ax.set_facecolor(backgroundColour) ax.xaxis.pane.set_edgecolor(backgroundColour) ax.yaxis.pane.set_edgecolor(backgroundColour) ax.zaxis.pane.set_edgecolor(backgroundColour) if inst == "SOXS": plt.xlabel("x-axis", fontsize=16) plt.ylabel("y-axis", fontsize=16) else: plt.xlabel("y-axis", fontsize=16) plt.ylabel("x-axis", fontsize=16) ax2 = fig.add_subplot(122) else: if rotatedImg.shape[0] - rotatedImg.shape[1] > 1000: fig = plt.figure(figsize=(5, 12)) else: fig = plt.figure(figsize=(12, 5)) # palette.set_over('r', 1.0) # palette.set_under('g', 1.0) ax2 = fig.add_subplot(111) if not isinstance(dispMapImage, bool): for l in range(int(gridLinePixelTable["line"].max())): mask = gridLinePixelTable["line"] == l if inst == "SOXS": ax2.plot( gridLinePixelTable.loc[mask]["fit_x"], gridLinePixelTable.loc[mask]["fit_y"], "w-", linewidth=0.5, alpha=0.8, color="black", ) else: ax2.plot( gridLinePixelTable.loc[mask]["fit_y"], gridLinePixelTable.loc[mask]["fit_x"], "w-", linewidth=0.5, alpha=0.8, color="black", ) if rotatedImg.shape[0] - rotatedImg.shape[1] > 1000: ax2.set_box_aspect(2.0) else: ax2.set_box_aspect(0.5) detectorPlot = plt.imshow(rotatedImg, vmin=vmin, vmax=vmax, cmap=palette, alpha=1, aspect="auto") if surfacePlot: shrink = 0.5 else: shrink = 1.0 if mean > 10: fmt = "%1.0f" fig.colorbar(detectorPlot, shrink=shrink, format=fmt) else: fig.colorbar(detectorPlot, shrink=shrink) # plt.colorbar() if title: fig.suptitle(title, fontsize=20) if inst == "XSHOOTER": ax2.invert_yaxis() # cbar.ticklabel_format(useOffset=False) if inst == "SOXS": plt.xlabel("x-axis", fontsize=16) plt.ylabel("y-axis", fontsize=16) else: plt.xlabel("y-axis", fontsize=16) plt.ylabel("x-axis", fontsize=16) if show: plt.pause(0.1) # plt.show() if saveToPath: plt.savefig(saveToPath, dpi=120, format="pdf", bbox_inches="tight") plt.clf() # clear figure mpl.rcParams.update(originalRC) plt.close("all") log.debug("completed the ``quicklook_image`` function") return None
[docs] def unpack_order_table( log, orderTablePath, extend=0.0, pixelDelta=1, binx=1, biny=1, prebinned=False, order=False, limitToDetectorFormat=False, ): """*Unpack an order location table and return an `orderPolyTable` dataframe containing the polynomial coefficients for the order centres and edges, an `orderPixelTable` dataframe containing the pixel-coordinates for each order centre and edges, and finally, an `orderMetaTable` dataframe giving metadata about the frame binning and format.* **Key Arguments:** - ``orderTablePath`` -- path to the order table - ``extend`` -- fractional increase to the order area in the y-axis (needed for masking) - ``pixelDelta`` -- space between returned data points. Default *1* (sampled at every pixel) - ``binx`` -- binning in the x-axis (from FITS header). Default *1* - ``biny`` -- binning in the y-axis (from FITS header). Default *1* - ``prebinned`` -- was the order-table measured on a pre-binned frame (typically only for mflats). Default *False* - ``order`` -- unpack only a single order - ``limitToDetectorFormat`` -- limit the pixels return to those limited by the detector format static calibration table **Usage:** ```python # UNPACK THE ORDER TABLE from soxspipe.commonutils.toolkit import unpack_order_table orderPolyTable, orderPixelTable = unpack_order_table( log=self.log, orderTablePath=orderTablePath, extend=0.) ``` """ log.debug("starting the ``functionName`` function") from astropy.table import Table import pandas as pd import numpy as np import math # PIXEL DELTA NEEDS TO BE ODD .. ELSE MASKING ON BINNED DATA GETS MESSED UP if pixelDelta % 2 == 0: pixelDelta += 1 # Return the nearest odd number above if it's even # MAKE RELATIVE HOME PATH ABSOLUTE home = expanduser("~") orderTablePath = orderTablePath.replace("~", home) dat = Table.read(orderTablePath, format="fits", hdu=1) orderPolyTable = dat.to_pandas() dat = Table.read(orderTablePath, format="fits", hdu=2) orderMetaTable = dat.to_pandas() if order: mask = orderMetaTable["order"] == order orderMetaTable = orderMetaTable.loc[mask] if "degy_cent" in orderPolyTable.columns: axisA = "x" axisB = "y" axisAbin = binx axisBbin = biny else: axisA = "y" axisB = "x" axisAbin = biny axisBbin = binx # ADD AXIS B COORD LIST if prebinned: ratio = axisBbin else: ratio = 1 blower = orderMetaTable[f"{axisB}min"].values * ratio bupper = orderMetaTable[f"{axisB}max"].values * ratio brange = orderMetaTable[f"{axisB}max"].values * ratio - orderMetaTable[f"{axisB}min"].values * ratio axisBcoords = [ np.arange( 0 if (math.floor(l) - int(r * extend)) < 0 else (math.floor(l) - int(r * extend)), 4200 if (math.ceil(u) + int(r * extend)) > 4200 else (math.ceil(u) + int(r * extend)), pixelDelta, ) for l, u, r in zip(blower, bupper, brange) ] orders = [np.full_like(a, o) for a, o in zip(axisBcoords, orderMetaTable["order"].values)] # CREATE DATA FRAME FROM A DICTIONARY OF LISTS myDict = {f"{axisB}coord": np.concatenate(axisBcoords), "order": np.concatenate(orders)} orderPixelTable = pd.DataFrame(myDict) cent_coeff = [float(v) for k, v in orderPolyTable.iloc[0].items() if "cent_" in k] poly = chebyshev_order_xy_polynomials( log=log, axisBCol=f"{axisB}coord", orderCol="order", orderDeg=int(orderPolyTable.iloc[0]["degorder_cent"]), axisBDeg=int(orderPolyTable.iloc[0][f"deg{axisB}_cent"]), ).poly orderPixelTable[f"{axisA}coord_centre"] = poly(orderPixelTable, *cent_coeff) std_coeff = [float(v) for k, v in orderPolyTable.iloc[0].items() if "std_" in k] if len(std_coeff): poly = chebyshev_order_xy_polynomials( log=log, axisBDeg=int(orderPolyTable.iloc[0][f"deg{axisB}_cent"]), orderDeg=int(orderPolyTable.iloc[0]["degorder_cent"]), orderCol="order", axisBCol=f"{axisB}coord", ).poly orderPixelTable["std"] = poly(orderPixelTable, *std_coeff) if f"deg{axisB}_edgeup" in orderPolyTable.columns: upper_coeff = [float(v) for k, v in orderPolyTable.iloc[0].items() if "edgeup_" in k] poly = chebyshev_order_xy_polynomials( log=log, axisBDeg=int(orderPolyTable.iloc[0][f"deg{axisB}_edgeup"]), orderDeg=int(orderPolyTable.iloc[0]["degorder_edgeup"]), orderCol="order", axisBCol=f"{axisB}coord", ).poly orderPixelTable[f"{axisA}coord_edgeup"] = poly(orderPixelTable, *upper_coeff) if f"deg{axisB}_edgelow" in orderPolyTable.columns: lower_coeff = [float(v) for k, v in orderPolyTable.iloc[0].items() if "edgelow_" in k] poly = chebyshev_order_xy_polynomials( log=log, axisBDeg=int(orderPolyTable.iloc[0][f"deg{axisB}_edgelow"]), orderDeg=int(orderPolyTable.iloc[0]["degorder_edgelow"]), orderCol="order", axisBCol=f"{axisB}coord", ).poly orderPixelTable[f"{axisA}coord_edgelow"] = poly(orderPixelTable, *lower_coeff) if axisAbin != 1: for c in ["coord_centre", "coord_edgeup", "coord_edgelow"]: if f"{axisA}{c}" in orderPixelTable.columns: orderPixelTable[f"{axisA}{c}"] /= axisAbin if axisBbin != 1: orderMetaTable[f"{axisB}min"] /= axisBbin orderMetaTable[f"{axisB}max"] /= axisBbin orderPixelTable[f"{axisB}coord"] /= axisBbin orderPixelTable["std"] /= axisBbin mask = orderPixelTable[f"{axisB}coord"].mod(1) > 0 orderPixelTable = orderPixelTable.loc[~mask] orderPixelTable[f"{axisB}coord"] = orderPixelTable[f"{axisB}coord"].round().astype("int") log.debug("completed the ``functionName`` function") return orderPolyTable, orderPixelTable, orderMetaTable
[docs] def generic_quality_checks(log, frame, settings, recipeName, qcTable): """*measure very basic quality checks on a frame and return the QC table with results appended* **Key Arguments:** - `log` -- logger - `frame` -- CCDData object - `settings` -- soxspipe settings - `recipeName` -- the name of the recipe - `qcTable` -- the QC pandas data-frame to save the QC measurements **Usage:** ```python from soxspipe.commonutils.toolkit import generic_quality_checks qcTable = generic_quality_checks(log=log, frame=myFrame, settings=settings, recipeName="my recipe", qcTable=qcTable) ``` """ log.debug("starting the ``functionName`` function") import numpy as np import pandas as pd # KEYWORD LOOKUP OBJECT - LOOKUP KEYWORD FROM DICTIONARY IN RESOURCES # FOLDER kw = keyword_lookup(log=log, settings=settings).get kw = kw arm = frame.header[kw("SEQ_ARM")] dateObs = frame.header[kw("DATE_OBS")] # nanCount = np.count_nonzero(np.isnan(frame.data)) utcnow = datetime.now(UTC) utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S") # COUNT BAD-PIXELS badCount = frame.mask.sum() totalPixels = np.size(frame.mask) percent = float(badCount) / float(totalPixels) percent = float("{:.6f}".format(percent)) if "dark" in recipeName.lower(): qcComment = "Number of hot pixels" qcName = "HOTPIX NUM" elif "flat" in recipeName.lower(): qcComment = "Number of cold pixels" qcName = "COLDPIX NUM" else: qcComment = "Number of bad pixels" qcName = "BADPIX NUM" qcTable = pd.concat( [ qcTable, pd.Series( { "soxspipe_recipe": recipeName, "qc_name": qcName, "qc_value": int(badCount), "qc_comment": qcComment, "qc_unit": "", "obs_date_utc": dateObs, "reduction_date_utc": utcnow, "to_header": True, } ) .to_frame() .T, ], ignore_index=True, ) if "dark" in recipeName.lower(): qcComment = "Fraction of hot pixels" qcName = "HOTPIX FRAC" elif "flat" in recipeName.lower(): qcComment = "Fraction of cold pixels" qcName = "COLDPIX FRAC" else: qcComment = "Fraction of bad pixels" qcName = "BADPIX FRAC" qcTable = pd.concat( [ qcTable, pd.Series( { "soxspipe_recipe": recipeName, "qc_name": qcName, "qc_value": percent, "qc_comment": qcComment, "qc_unit": "", "obs_date_utc": dateObs, "reduction_date_utc": utcnow, "to_header": True, } ) .to_frame() .T, ], ignore_index=True, ) log.debug("completed the ``functionName`` function") return qcTable
[docs] def spectroscopic_image_quality_checks(log, frame, orderTablePath, settings, recipeName, qcTable): """*measure and record spectroscopic image quailty checks* **Key Arguments:** - `log` -- logger - `frame` -- CCDData object - ``orderTablePath`` -- path to the order table - `settings` -- soxspipe settings - `recipeName` -- the name of the recipe - `qcTable` -- the QC pandas data-frame to save the QC measurements **Usage:** ```python from soxspipe.commonutils.toolkit import spectroscopic_image_quality_checks qcTable = spectroscopic_image_quality_checks( log=log, frame=myFrame, settings=settings, recipeName="this recipe", qcTable=qcTable, orderTablePath=orderTablePath) ``` """ log.debug("starting the ``functionName`` function") import numpy.ma as ma import numpy as np import pandas as pd from soxspipe.commonutils import detector_lookup from astropy.io import fits # KEYWORD LOOKUP OBJECT - LOOKUP KEYWORD FROM DICTIONARY IN RESOURCES # FOLDER kw = keyword_lookup(log=log, settings=settings).get arm = frame.header[kw("SEQ_ARM")] dateObs = frame.header[kw("DATE_OBS")] try: binx = frame.header[kw("WIN_BINX")] biny = frame.header[kw("WIN_BINY")] except: if arm.lower() == "nir": binx = 1 biny = 1 inst = frame.header[kw("INSTRUME")] # DETECTOR PARAMETERS LOOKUP OBJECT detectorParams = detector_lookup(log=log, settings=settings).get(arm) if detectorParams["dispersion-axis"] == "x": axisA = "x" axisB = "y" else: axisA = "y" axisB = "x" # UNPACK THE ORDER TABLE orderTableMeta, orderTablePixels, orderMetaTable = unpack_order_table( log=log, orderTablePath=orderTablePath, binx=binx, biny=biny, prebinned=True ) mask = np.ones_like(frame.data) axisACoords_up = orderTablePixels[f"{axisA}coord_edgeup"].values axisACoords_low = orderTablePixels[f"{axisA}coord_edgelow"].values axisBCoords = orderTablePixels[f"{axisB}coord"].values axisACoords_up = axisACoords_up.astype(int) axisACoords_low = axisACoords_low.astype(int) if axisA == "x": for u, l, y in zip(axisACoords_up, axisACoords_low, axisBCoords): y = int(y) l = int(max(0, l)) u = int(min(mask.shape[1], u)) if 0 <= y < mask.shape[0] and l < u: mask[y, l:u] = 0 else: for u, l, x in zip(axisACoords_up, axisACoords_low, axisBCoords): x = int(x) l = int(max(0, l)) u = int(min(mask.shape[0], u)) if 0 <= x < mask.shape[1] and l < u: mask[l:u, x] = 0 # COMBINE MASK WITH THE BAD PIXEL MASK mask = (mask == 1) | (frame.mask == 1) # PLOT ONE OF THE MASKED FRAMES TO CHECK maskedFrame = ma.array(frame.data, mask=mask) quicklook_image(log=log, CCDObject=maskedFrame, show=False, ext=None) mean = np.ma.mean(maskedFrame) flux = np.ma.sum(maskedFrame) utcnow = datetime.utcnow() utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S") mean = "%0.*f" % (3, mean) flux = "%0.*f" % (3, flux) qcTable = pd.concat( [ qcTable, pd.Series( { "soxspipe_recipe": recipeName, "qc_name": "INNER ORDER PIX MEAN", "qc_value": mean, "qc_comment": "[e-] Mean inner-order pixel value", "qc_unit": "electrons", "obs_date_utc": dateObs, "reduction_date_utc": utcnow, "to_header": True, } ) .to_frame() .T, ], ignore_index=True, ) qcTable = pd.concat( [ qcTable, pd.Series( { "soxspipe_recipe": recipeName, "qc_name": "INNER ORDER PIX SUM", "qc_value": flux, "qc_comment": "[e-] Sum of all inner-order pixel values", "qc_unit": "electrons", "obs_date_utc": dateObs, "reduction_date_utc": utcnow, "to_header": True, } ) .to_frame() .T, ], ignore_index=True, ) log.debug("completed the ``functionName`` function") return qcTable
[docs] def read_spectral_format(log, settings, arm, dispersionMap=False, extended=True, binx=1, biny=1): """*read the spectral format table to get some key parameters* **Key Arguments:** - `log` -- logger - `settings` -- soxspipe settings - `arm` -- arm to retrieve format for - `dispersionMap` -- if a dispersion map is given, the minimum and maximum dispersion axis pixel limits are computed - `extended` -- the spectral format table can provide WLMIN/WLMAX (extended=False) or WLMINFUL/WLMAXFUL (extended=True) - ``binx`` -- binning in the x-axis (from FITS header). Default *1* - ``biny`` -- binning in the y-axis (from FITS header). Default *1* **Return:** - ``orderNums`` -- a list of the order numbers - ``waveLengthMin`` -- a list of the maximum wavelengths reached by each order - ``waveLengthMax`` -- a list of the minimum wavelengths reached by each order **Usage:** ```python from soxspipe.commonutils.toolkit import read_spectral_format # READ THE SPECTRAL FORMAT TABLE TO DETERMINE THE LIMITS OF THE TRACES orderNums, waveLengthMin, waveLengthMax = read_spectral_format( log=self.log, settings=self.settings, arm=arm) ``` """ log.debug("starting the ``read_spectral_format`` function") import numpy as np import pandas as pd from astropy.io import fits # DETECTOR PARAMETERS LOOKUP OBJECT dp = detector_lookup(log=log, settings=settings).get(arm) # KEYWORD LOOKUP OBJECT - LOOKUP KEYWORD FROM DICTIONARY IN RESOURCES # FOLDER kw = keyword_lookup(log=log, settings=settings).get science_pixels = dp["science-pixels"] # READ THE SPECTRAL FORMAT TABLE FILE home = expanduser("~") calibrationRootPath = get_calibrations_path(log=log, settings=settings) spectralFormatFile = calibrationRootPath + "/" + dp["spectral format table"] # SPEC FORMAT TO PANDAS DATAFRAME from astropy.table import Table dat = Table.read(spectralFormatFile, format="fits") specFormatTable = dat.to_pandas() # EXTRACT REQUIRED PARAMETERS orderNums = specFormatTable["ORDER"].values if extended or "WLMIN" not in specFormatTable.columns: waveLengthMin = specFormatTable["WLMINFUL"].values waveLengthMax = specFormatTable["WLMAXFUL"].values else: waveLengthMin = specFormatTable["WLMIN"].values waveLengthMax = specFormatTable["WLMAX"].values # USE DISPERSION MAP TO FIND X-Y LIMITS OF THE SPECTRAL FORMAT FOR EACH ORDER # WE WANT TO LIMIT THE EXTRACTION TO THESE REGIONS if not isinstance(dispersionMap, bool): myDict = {"order": np.asarray([]), "wavelength": np.asarray([]), "slit_position": np.asarray([])} for o, wmin, wmax in zip(orderNums, waveLengthMin, waveLengthMax): wlArray = np.array([wmin, wmax]) myDict["wavelength"] = np.append(myDict["wavelength"], wlArray) myDict["order"] = np.append(myDict["order"], np.ones(len(wlArray)) * o) myDict["slit_position"] = np.append(myDict["slit_position"], np.zeros(len(wlArray))) orderPixelTable = pd.DataFrame(myDict) orderPixelTable = dispersion_map_to_pixel_arrays( log=log, dispersionMapPath=dispersionMap, orderPixelTable=orderPixelTable, removeOffDetectorLocation=False ) orderPixelRanges = [] if dp["dispersion-axis"] == "x": axis = "y" rowCol = "rows" abinFactor = biny else: axis = "x" rowCol = "columns" abinFactor = binx amins = [] amaxs = [] for o in orderNums: amin = orderPixelTable.loc[orderPixelTable["order"] == o, f"fit_{axis}"].min() amax = orderPixelTable.loc[orderPixelTable["order"] == o, f"fit_{axis}"].max() if amin < 0: amin = 0 if amax > dp["science-pixels"][rowCol]["end"]: amax = dp["science-pixels"][rowCol]["end"] amins.append(amin / abinFactor) amaxs.append(amax / abinFactor) log.debug("completed the ``read_spectral_format`` function") return orderNums, waveLengthMin, waveLengthMax, amins, amaxs log.debug("completed the ``read_spectral_format`` function") return orderNums, waveLengthMin, waveLengthMax
[docs] def get_calibrations_path(log, settings): """*return the root path to the static calibrations* **Key Arguments:** - `log` -- logger - ``settings`` -- the settings dictionary **Usage:** ```python from soxspipe.commonutils.toolkit import get_calibrations_path calibrationRootPath = get_calibrations_path(log=log, settings=settings) ``` """ log.debug("starting the ``get_calibrations_path`` function") # GENERATE PATH TO STATIC CALIBRATION DATA if "instrument" in settings: instrument = settings["instrument"] else: instrument = "soxs" calibrationRootPath = os.path.dirname(os.path.dirname(__file__)) + "/resources/static_calibrations/" + instrument log.debug("completed the ``get_calibrations_path`` function") return calibrationRootPath
[docs] def twoD_disp_map_image_to_dataframe( log, slit_length, twoDMapPath, kw=False, associatedFrame=False, removeMaskedPixels=False, dispAxis="y" ): """*convert the 2D dispersion image map to a pandas dataframe* **Key Arguments:** - `log` -- logger - `twoDMapPath` -- 2D dispersion map image path - `kw` -- fits keyword lookup dictionary - `associatedFrame` -- include a flux column in returned dataframe from a frame associated with the dispersion map. Default *False* - `removeMaskedPixels` -- remove the masked pixels from the associated image? Default *False* - `dispAxis` -- x or y. Needed for pixel scale calculation **Usage:** ```python from soxspipe.commonutils.toolkit import twoD_disp_map_image_to_dataframe mapDF = twoD_disp_map_image_to_dataframe(log=log, twoDMapPath=twoDMap, associatedFrame=objectFrame, kw=kw) ``` """ log.debug("starting the ``twoD_disp_map_image_to_dataframe`` function") import pandas as pd import numpy as np from astropy.io import fits # MAKE RELATIVE HOME PATH ABSOLUTE from os.path import expanduser home = expanduser("~") if twoDMapPath[0] == "~": twoDMapPath = twoDMapPath.replace("~", home) binx = 1 biny = 1 # FIND THE APPROPRIATE PREDICTED LINE-LIST if associatedFrame: arm = associatedFrame.header[kw("SEQ_ARM")] if arm != "NIR" and kw("WIN_BINX") in associatedFrame.header: binx = int(associatedFrame.header[kw("WIN_BINX")]) biny = int(associatedFrame.header[kw("WIN_BINY")]) hdul = fits.open(twoDMapPath) hdul["WAVELENGTH"].data = hdul["WAVELENGTH"].data.astype("float32") hdul["SLIT"].data = hdul["SLIT"].data.astype("float32") hdul["ORDER"].data = hdul["ORDER"].data.astype("float32") binned = False if binx > 1 or biny > 1: binned = True from astropy.nddata import block_reduce minimumBinnedPixelValue = hdul["WAVELENGTH"].data.copy() hdul["WAVELENGTH"].data = block_reduce(hdul["WAVELENGTH"].data, (biny, binx), func=np.mean) hdul["SLIT"].data = block_reduce(hdul["SLIT"].data, (biny, binx), func=np.mean) hdul["ORDER"].data = block_reduce(hdul["ORDER"].data, (biny, binx), func=np.mean) minimumBinnedPixelValue = block_reduce(minimumBinnedPixelValue, (biny, binx), func=np.min) minimumBinnedPixelValue = minimumBinnedPixelValue.flatten() # MAKE X, Y ARRAYS TO THEN ASSOCIATE WITH WL, SLIT AND ORDER xdim = hdul[0].data.shape[1] ydim = hdul[0].data.shape[0] xarray = np.tile(np.arange(0, xdim), ydim) yarray = np.repeat(np.arange(0, ydim), xdim) if not binned: minimumBinnedPixelValue = np.ones_like(yarray) thisDict = { "x": xarray, "y": yarray, "wavelength": hdul["WAVELENGTH"].data.flatten().astype("float32"), "slit_position": hdul["SLIT"].data.flatten().astype("float32"), "order": hdul["ORDER"].data.flatten().astype("float32"), "min": minimumBinnedPixelValue.astype("float32"), } if associatedFrame: thisDict["flux"] = associatedFrame.data.flatten().astype("float32") thisDict["mask"] = associatedFrame.mask.flatten().astype(bool) thisDict["error"] = associatedFrame.uncertainty.array.flatten().astype("float32") # REMOVE IF ABOVE .astype(float) IS WORKING # try: # if associatedFrame: # thisDict["flux"] = associatedFrame.data.flatten() # thisDict["mask"] = associatedFrame.mask.flatten() # thisDict["error"] = associatedFrame.uncertainty.array.flatten() # except Exception as e: # if binned: # minimumBinnedPixelValue = minimumBinnedPixelValue.byteswap().newbyteorder() # thisDict = { # "x": xarray, # "y": yarray, # "wavelength": hdul["WAVELENGTH"].data.flatten().byteswap().newbyteorder(), # "slit_position": hdul["SLIT"].data.flatten().byteswap().newbyteorder(), # "order": hdul["ORDER"].data.flatten().byteswap().newbyteorder(), # "min": minimumBinnedPixelValue # } # if associatedFrame: # thisDict["flux"] = associatedFrame.data.flatten().byteswap().newbyteorder() # thisDict["mask"] = associatedFrame.mask.flatten().byteswap().newbyteorder() # thisDict["error"] = associatedFrame.uncertainty.array.flatten().byteswap().newbyteorder() mapDF = pd.DataFrame.from_dict(thisDict) if removeMaskedPixels: mask = mapDF["mask"] == False mapDF = mapDF.loc[mask] # REMOVE ZEROS mask = mapDF["wavelength"] == 0 mapDF = mapDF.loc[~mask] interOrderMask = hdul["ORDER"].data.copy() interOrderMask = np.where(interOrderMask > 0, 0, interOrderMask) interOrderMask = np.where(np.isnan(interOrderMask), 1, interOrderMask) mapDF.dropna(how="all", subset=["wavelength", "slit_position", "order"], inplace=True) # REMOVE FILTERED ROWS FROM DATA FRAME mask = (mapDF["slit_position"] < -slit_length / 2) | (mapDF["slit_position"] > slit_length / 2) mapDF = mapDF.loc[~mask] mask = mapDF["min"] == 0 mapDF = mapDF.loc[~mask] # SORT BY COLUMN NAME mapDF.sort_values(["wavelength"], inplace=True) # CALCULATE PIXEL SCALE if dispAxis == "y": mapDF.sort_values(["x", "y"], inplace=True) else: mapDF.sort_values(["y", "x"], inplace=True) shiftedWlArray = list(mapDF["wavelength"].values)[1:] shiftedWlArray.append(np.nan) mapDF["pixelScale"] = mapDF["wavelength"] - shiftedWlArray mask = (mapDF["pixelScale"] > 2) | (mapDF["pixelScale"] < -2) mapDF.loc[mask, "pixelScale"] = 0.0 mapDF["pixelScale"] = mapDF["pixelScale"].abs() # SORT BY COLUMN NAME mapDF.sort_values(["wavelength"], inplace=True) log.debug("completed the ``twoD_disp_map_image_to_dataframe`` function") return mapDF, interOrderMask
[docs] def predict_product_path(sofName, recipeName=False): """*predict the path of the recipe product from a given SOF name* **Key Arguments:** - `log` -- logger, - `sofName` -- name or full path to the sof file - ``recipeName`` -- name of the recipe being considered. Default *False*. **Usage:** ```python from soxspipe.commonutils import toolkit productPath, startNightDate = toolkit.predict_product_path(sofFilePath) ``` """ from astropy.time import Time, TimeDelta import codecs startNightDate = False # TRY AND READ startNightDate FROM RAW FRAME DIRECTORY # with codecs.open(sofName, encoding="utf-8", mode="r") as readFile: # thisData = readFile.read() # for l in thisData.split("\n"): # if "raw/" in l: # startNightDate = l.split("raw/")[1].split("/")[0] # break try: sofName = os.path.basename(sofName) except: pass if not recipeName: recipeName = sys.argv[1] if recipeName[0] == "-": recipeName = sys.argv[2] recipeName = "soxs-" + recipeName sofName = sofName.replace(".sof", "") if not startNightDate: obsDate = sofName.split("_")[0] startNightDate = "" try: obsDate = Time.strptime(obsDate, "%Y%m%dT%H%M%S", scale="utc") night_start_offset = TimeDelta(15.0 * 60 * 60, format="sec") startNightDate = obsDate - night_start_offset startNightDate = startNightDate.strftime("%Y-%m-%d") except: print("Could not determine OBSDATE from sof filename") pass from soxspipe.commonutils import data_organiser from fundamentals.logs import emptyLogger log = emptyLogger() do = data_organiser(log=log, rootDir=".", dbConnect=False) currentSession, allSessions = do.session_list(silent=True) do.close() if "_STARE" in sofName: sofName += "_EXTRACTED_MERGED" if "_NOD" in sofName: sofName += "_EXTRACTED_MERGED" productPath = ( f"./sessions/{currentSession}/reduced/{startNightDate}/" + recipeName.replace("_", "-") + "/" + sofName + ".fits" ) productPath = productPath.replace("//", "/") return productPath, startNightDate
[docs] def add_recipe_logger(log, productPath): """*add a recipe-specific handler to the default logger that writes the recipe's logs adjacent to the recipe project* **Key Arguments:** - `log` -- original logger - `productPath` -- path to the recipe product **Usage:** ```python from soxspipe.commonutils.toolkit import add_recipe_logger log = add_recipe_logger(log, productPath="/path/to/product") ``` """ import logging import os i = 0 while i < 3: for handler in log.handlers: if handler.get_name() == "recipeLog": log.removeHandler(handler) if handler.get_name() == "recipeErr": log.removeHandler(handler) i += 1 # GET THE EXTENSION (WITH DOT PREFIX) loggingPath = os.path.splitext(productPath)[0] + ".log" loggingErrorPath = os.path.splitext(productPath)[0] + "_ERROR.log" try: os.remove(loggingPath) os.remove(loggingErrorPath) except: pass # PARENT DIRECTORY PATH NEEDS TO EXIST FOR LOGGER TO WRITE parentDirectory = os.path.dirname(loggingPath) if not os.path.exists(parentDirectory): try: os.makedirs(parentDirectory) except: pass recipeLog = logging.FileHandler(loggingPath, mode="a", encoding=None, delay=False) recipeLogFormatter = logging.Formatter("%(message)s") recipeLog.set_name("recipeLog") recipeLog.setLevel(logging.INFO + 1) recipeLog.setFormatter(recipeLogFormatter) recipeLog.addFilter(MaxFilter(logging.WARNING)) log.addHandler(recipeLog) recipeErr = logging.FileHandler(loggingErrorPath, mode="a", encoding=None, delay=True) recipeErrFormatter = logging.Formatter( '%(asctime)s %(levelname)s: "%(pathname)s", line %(lineno)d, in %(funcName)s > %(message)s', "%Y-%m-%d %H:%M:%S" ) recipeErr.set_name("recipeErr") recipeErr.setLevel(logging.ERROR) recipeErr.setFormatter(recipeErrFormatter) log.addHandler(recipeErr) return log
[docs] class MaxFilter: def __init__(self, max_level): self.max_level = max_level
[docs] def filter(self, record): if record.levelno < self.max_level: return True
[docs] def create_dispersion_solution_grid_lines_for_plot( log, dispMap, dispMapImage, associatedFrame, kw, skylines=False, slitPositions=False, slit_length=11 ): """*given a dispersion solution and accompanying 2D dispersion map image, generate the grid lines to add to QC plots* **Key Arguments:** - `log` -- logger - ``dispMap`` -- path to dispersion map. Default *False* - ``dispMapImage`` -- the 2D dispersion map image - `associatedFrame` -- a frame associated with the reduction (to read arm and binning info). - `kw` -- fits header kw dictionary - `skylines` -- a list of skylines to use as the grid. Default *False* - `slitPositions` -- slit positions to plot (else plot min and max) - `slit_length` -- length of the slit to use for the dispersion map dataframe (default 11) **Returns:** - `orderPixelTable` -- DataFrame containing the pixel coordinates for grid lines to plot. - `interOrderMask` -- Mask array indicating inter-order regions. **Usage:** ```python from soxspipe.commonutils.toolkit import create_dispersion_solution_grid_lines_for_plot gridLinePixelTable, interOrderMask = create_dispersion_solution_grid_lines_for_plot( log=log, dispMap=dispMap, dispMapImage=dispMapImage, associatedFrame=CCDObject, kw=kw, skylines=skylines ) for l in range(int(gridLinePixelTable['line'].max())): mask = (gridLinePixelTable['line'] == l) ax.plot(gridLinePixelTable.loc[mask]["fit_y"], gridLinePixelTable.loc[mask]["fit_x"], "w-", linewidth=0.5, alpha=0.8, color="black") ``` """ log.debug("starting the ``create_dispersion_solution_grid_lines_for_plot`` function") import numpy as np import pandas as pd dispMapDF, interOrderMask = twoD_disp_map_image_to_dataframe( log=log, slit_length=slit_length, twoDMapPath=dispMapImage, associatedFrame=associatedFrame, kw=kw ) uniqueOrders = dispMapDF["order"].unique() wlLims = [] sPos = [] for o in uniqueOrders: filDF = dispMapDF.loc[dispMapDF["order"] == o] wlRange = filDF["wavelength"].max() - filDF["wavelength"].min() wlLims.append((filDF["wavelength"].min(), filDF["wavelength"].max())) if isinstance(slitPositions, bool): sPos.append((filDF["slit_position"].min(), filDF["slit_position"].max())) else: sPos.append(slitPositions) lineNumber = 0 orderPixelTable_list = [] for o, wlLim, spLim in zip(uniqueOrders, wlLims, sPos): wlRange = np.arange(wlLim[0], wlLim[1], 1) wlRange = np.append(wlRange, [wlLim[1]]) for e in spLim: myDict = { "line": np.full_like(wlRange, lineNumber), "order": np.full_like(wlRange, o), "wavelength": wlRange, "slit_position": np.full_like(wlRange, e), } orderPixelTable_list.append(pd.DataFrame(myDict)) lineNumber += 1 spRange = np.arange(min(spLim), max(spLim), 1) spRange = np.append(spRange, [max(spLim)]) if not isinstance(skylines, bool): mask = skylines["WAVELENGTH"].between(wlLim[0], wlLim[1]) wlRange = skylines.loc[mask]["WAVELENGTH"].values else: step = max(1, int((wlLim[1] - wlLim[0]) // 400)) wlRange = np.arange(wlLim[0], wlLim[1], step) wlRange = np.append(wlRange, [wlLim[1]]) for l in wlRange: myDict = { "line": np.full_like(spRange, lineNumber), "order": np.full_like(spRange, o), "wavelength": np.full_like(spRange, l), "slit_position": spRange, } orderPixelTable_list.append(pd.DataFrame(myDict)) lineNumber += 1 orderPixelTable = pd.concat(orderPixelTable_list, ignore_index=True) orderPixelTable = dispersion_map_to_pixel_arrays( log=log, dispersionMapPath=dispMap, orderPixelTable=orderPixelTable ) log.debug("completed the ``create_dispersion_solution_grid_lines_for_plot`` function") return orderPixelTable, interOrderMask
[docs] def get_calibration_lamp(log, frame, kw): """*given a frame, determine which calibration lamp is being used* **Key Arguments:** - `log` -- logger - `frame` -- the frame to determine the calibration lamp for - `kw` -- the FITS header keyword dictionary **Usage:** ```python from soxspipe.commonutils.toolkit import get_calibration_lamp lamp = get_calibration_lamp(log=log, frame=frame, kw=kw) ``` """ log.debug("starting the ``read_calibration_lamp`` function") inst = frame.header["INSTRUME"] lamp = None for l in [kw("LAMP1"), kw("LAMP2"), kw("LAMP3"), kw("LAMP4"), kw("LAMP5"), kw("LAMP6"), kw("LAMP7")]: if l in frame.header: newLamp = frame.header[l] newLamp = ( newLamp.replace("UVB_High", "QTH") .replace("UVB_Low_", "") .replace("NIR_", "") .replace("VIS_", "") .replace("UVB_", "") .replace("_lamp", "") .replace("_Lamp", "") .replace("Argo", "Ar") .replace("Neon", "Ne") .replace("Merc", "Hg") .replace("Xeno", "Xe") ) if lamp: lamp += newLamp else: lamp = newLamp log.debug("completed the ``read_calibration_lamp`` function") return lamp
[docs] def qc_settings_plot_tables(log, qc, qcAx, settings, settingsAx): """*generate QC and settings table to be placed at the bottom of the QC plots* **Key Arguments:** - `log` -- logger - `qc` -- date frame of collected QCs - `qcAx` -- the axis to add the QC table to - `settings` -- settings to report in settings table - `settingsAx` -- the axis to add the settings table to **Usage:** ```python from soxspipe.commonutils.toolkit import qc_settings_plot_tables qc_settings_plot_tables(log=log,qc=self.qc,qcAx=qcAx, settings=settings,settingsAx=settingsAx) ``` """ log.debug("starting the ``qc_settings_plot_tables`` function") import matplotlib as plt import numpy as np import pandas as pd tables = [] cols = [] qcCopy = qc.copy() if "qc_order" in qcCopy.columns: qcCopy = qcCopy.loc[((qcCopy["qc_order"] == "-1") | (qcCopy["qc_order"].isna()) | (qcCopy["qc_order"] == -1))] qcCopy["value"] = qcCopy["qc_value"].astype(str) + " " + qcCopy["qc_unit"] qcCopy.loc[qcCopy["value"].isnull(), "value"] = qcCopy.loc[qcCopy["value"].isnull(), "qc_value"] if len(qcCopy.index) > 10: qcCopy = qcCopy.head(10) columns1 = ["value", "qc_comment"] colColours = plt.cm.Greys(np.full(len(columns1), 0.1)) rowColours = plt.cm.Greys(np.full(len(qcCopy.index), 0.1)) rowLabels = qcCopy["qc_name"].values if len(qcCopy[columns1].values): qcTable = qcAx.table( cellText=qcCopy[columns1].values, colLabels=columns1, loc="center", cellLoc="left", rowColours=rowColours, colColours=colColours, rowLabels=rowLabels, rowLoc="right", fontsize=14, ) tables.append(qcTable) cols.append(columns1) # qcAx.set_title( # "QC Table", fontsize=9) settingsCopy = {k: v for k, v in settings.items() if k not in ["nir", "vis", "uvb", "qc-acceptable-ranges"]} settingsCopy = {"setting": settingsCopy.keys(), "value": settingsCopy.values()} settingsDF = pd.DataFrame(settingsCopy) columns2 = ["value"] colColours = plt.cm.Greys(np.full(len(columns2), 0.1)) rowColours = plt.cm.Greys(np.full(len(settingsDF.index), 0.1)) rowLabels = settingsDF["setting"].values settingsTable = settingsAx.table( cellText=settingsDF[columns2].values, colLabels=columns2, loc="center", cellLoc="left", rowColours=rowColours, colColours=colColours, rowLabels=rowLabels, rowLoc="right", fontsize=14, ) tables.append(settingsTable) cols.append(columns2) # settingsAx.set_title( # "Parameters", fontsize=9, loc='left') settingsAx.margins(x=0, y=0) for t, c in zip(tables, cols): t.scale(1, 1.5) t.auto_set_font_size(False) t.set_fontsize(4) table_cells = t.properties()["children"] for cell in table_cells: cell.set_linewidth(0.3) t.auto_set_column_width(list(range(len(c)))) for a in [qcAx, settingsAx]: # Hide axes a.get_xaxis().set_visible(False) a.get_yaxis().set_visible(False) a.axis("off") log.debug("completed the ``qc_settings_plot_tables`` function") return None
[docs] def utility_setup(log, settings, recipeName, startNightDate): """*setup some tools needed by most soxspipe utils* **Key Arguments:** - `log` -- logger - `settings` -- the settings dictionary - `recipeName` -- name of the recipe as it appears in the settings dictionary - `startNightDate` -- YYYY-MM-DD date of the observation night. Default "" **Return:** - `qcDir` -- the QC directory (created if missing) - `productDir` -- the product directory (created if missing) **Usage:** ```python # Example usage with timezone-aware UTC datetime from datetime import datetime, UTC startNightDate = datetime.now(UTC).strftime("%Y-%m-%d") qcDir, productDir = utility_setup(log=log, settings=settings, recipeName="my_recipe", startNightDate=startNightDate) ``` """ log.debug("starting the ``utility_setup`` function") from os.path import expanduser home = expanduser("~") recipeName = recipeName.replace("-obj", "") # QC DIR qcDir = settings["workspace-root-dir"].replace("~", home) + f"/qc/{startNightDate}/{recipeName}/" qcDir = qcDir.replace("//", "/") # RECURSIVELY CREATE MISSING DIRECTORIES if not os.path.exists(qcDir): try: os.makedirs(qcDir) except Exception as e: pass # PRODUCT DIR productDir = settings["workspace-root-dir"].replace("~", home) + f"/reduced/{startNightDate}/{recipeName}/" productDir = productDir.replace("//", "/") # RECURSIVELY CREATE MISSING DIRECTORIES if not os.path.exists(productDir): try: os.makedirs(productDir) except Exception as e: pass log.debug("completed the ``utility_setup`` function") return qcDir, productDir
[docs] def plot_merged_spectrum_qc( merged_orders, products, log, qcDir, filenameTemplate, noddingSequence, dateObs, arm, recipeName, orderJoins=False, debug=False, fluxCalibrated=False, qcTable=False, settings=False, ): """ Plot merged spectrum QC plot as a standalone function. Returns: products (pd.DataFrame): Updated products table. filePath (str): Path to the saved QC plot PDF. """ log.debug("starting the ``plot_merged_spectrum_qc`` function") # DETECTOR PARAMETERS LOOKUP OBJECT from soxspipe.commonutils import detector_lookup # DO NOT PLOT IF PRODUCT TABLE HAS NOT BEEN PASSED if isinstance(products, bool) and not products: return products, None import matplotlib.pyplot as plt from datetime import datetime import pandas as pd from astropy import units as u from astropy.stats import sigma_clipped_stats import numpy as np if not noddingSequence: noddingSequence = "" # DETECTOR PARAMETERS LOOKUP OBJECT dp = detector_lookup(log=log, settings=settings).get(arm) # GET SKYLINE DATA calibrationRootPath = get_calibrations_path(log=log, settings=settings) skylines = calibrationRootPath + "/" + dp["skylines"] # SPEC FORMAT TO PANDAS DATAFRAME from astropy.table import Table dat = Table.read(skylines, format="fits") skylinesDF = dat.to_pandas() # FILTER TO STRONG SKY LINES ONLY FOR PLOTTING skylinesDF["FLUX"] = skylinesDF["FLUX"].astype(float) if arm == "VIS": mask = skylinesDF["FLUX"] > 5 else: mask = skylinesDF["FLUX"] > 500 skylinesDF = skylinesDF.loc[mask] fig = plt.figure(figsize=(14, 10), constrained_layout=True, dpi=180) # Adjusted height ratios gs = fig.add_gridspec(5, 1, height_ratios=[3, 1, 1, 1, 0]) # Top panel with linear scale top_panel = fig.add_subplot(gs[0, :]) if fluxCalibrated: top_panel.set_ylabel("flux (erg s$^{-1}$ cm$^{-2}$ $\\AA^{-1}$)", fontsize=10) else: top_panel.set_ylabel("flux ($e^{-}$)", fontsize=10) top_panel.set_title( f"Optimally Extracted Order-Merged Object Spectrum ({arm.upper()})\n{filenameTemplate.replace('.fits', '')}", fontsize=11, linespacing=2.0, ) top_panel.plot( merged_orders["WAVE"], merged_orders["FLUX_COUNTS"], linewidth=0.3, color="#dc322f" if not fluxCalibrated else "#2aa198", zorder=1, ) try: top_panel.set_xlim(merged_orders["WAVE"].min().value, merged_orders["WAVE"].max().value) except Exception: top_panel.set_xlim(merged_orders["WAVE"].min(), merged_orders["WAVE"].max()) # Middle panel with log scale middle_panel = fig.add_subplot(gs[1, :]) if not fluxCalibrated: middle_panel.set_ylabel("flux ($e^{-}$)", fontsize=10) else: middle_panel.set_ylabel("flux (erg s$^{-1}$ cm$^{-2}$ $\\AA^{-1}$)", fontsize=10) middle_panel.set_xlabel("wavelength (nm)", fontsize=10) middle_panel.set_yscale("log") middle_panel.plot( merged_orders["WAVE"], merged_orders["FLUX_COUNTS"], linewidth=0.3, color="#dc322f" if not fluxCalibrated else "#2aa198", zorder=1, ) from astropy.stats import sigma_clip, mad_std # SIGMA-CLIP THE DATA arrayMask = sigma_clip( merged_orders["FLUX_COUNTS"], sigma_lower=3, sigma_upper=15.0, maxiters=1, cenfunc="mean", stdfunc="std" ) mean, median, std = sigma_clipped_stats( merged_orders["FLUX_COUNTS"], sigma=5.0, stdfunc="std", cenfunc="mean", maxiters=3 ) maxFlux = arrayMask.max() + 3 * std minFlux = arrayMask.min() - 3 * std top_panel.set_ylim(minFlux, maxFlux) middle_panel.set_ylim(max(arrayMask.min() * 0.5, 0), arrayMask.max() * 2) try: middle_panel.set_xlim(merged_orders["WAVE"].min().value, merged_orders["WAVE"].max().value) except Exception: middle_panel.set_xlim(merged_orders["WAVE"].min(), merged_orders["WAVE"].max()) # Bottom panel with linear scale for SNR bottom_panel = fig.add_subplot(gs[2, :]) bottom_panel.set_ylabel("SNR", fontsize=10) bottom_panel.set_xlabel("wavelength (nm)", fontsize=10) if not isinstance(qcTable, bool): qcTable = qcTable.drop_duplicates(subset=["qc_name", "qc_order"], keep="last") snrValue = qcTable.loc[qcTable["qc_name"] == "SNR MEDIAN"] orderValue = snrValue["qc_order"].values for i in range(len(orderValue)): try: orderValue[i] = int(orderValue[i]) except (ValueError, TypeError): pass orderValue = np.array(["GLOBAL" if pd.isna(v) else v for v in orderValue]) snrValue = snrValue["qc_value"].values bottom_panel.plot(merged_orders["WAVE"], merged_orders["SNR"], linewidth=0.4, color="black", zorder=1) try: bottom_panel.set_xlim(merged_orders["WAVE"].min().value, merged_orders["WAVE"].max().value) except Exception: bottom_panel.set_xlim(merged_orders["WAVE"].min(), merged_orders["WAVE"].max()) mean, median, std = sigma_clipped_stats(merged_orders["SNR"], sigma=5.0, stdfunc="std", cenfunc="mean", maxiters=3) bottom_panel.set_ylim(0, mean + 4 * std) # ADD SNR VALUES TO BOTTOM PANEL if not isinstance(qcTable, bool) and len(orderValue): _vis_rank = {"GLOBAL": 0, "u": 1, "g": 2, "r": 3, "i": 4} def _order_key(pair): o = pair[0] if o in _vis_rank: return (0, _vis_rank[o]) try: return (1, float(o)) except (ValueError, TypeError): return (2, str(o)) pairs = sorted(zip(orderValue, snrValue), key=_order_key) snr_text = "\n".join(f"{o}: {v:.0f}" for o, v in pairs) bottom_panel.text( 0.99, 0.98, snr_text, transform=bottom_panel.transAxes, ha="right", va="top", fontsize=5, family="monospace", color="black", zorder=1, ) # SKY PANEL WITH SKY FLUX sky_panel = fig.add_subplot(gs[3, :]) sky_panel.set_xlabel("wavelength (nm)", fontsize=10) sky_panel.set_ylabel("sky flux ($e^{-}$)", fontsize=10) sky_panel.set_yscale("log") if "SKY_COUNTS" in merged_orders.columns: sky_panel.plot( merged_orders["WAVE"], merged_orders["SKY_COUNTS"], linewidth=0.3, color="#859900" if not fluxCalibrated else "#2aa198", zorder=1, ) sky_panel.set_ylim(10, merged_orders["SKY_COUNTS"].max() * 1.1) try: sky_panel.set_xlim(merged_orders["WAVE"].min().value, merged_orders["WAVE"].max().value) except Exception: sky_panel.set_xlim(merged_orders["WAVE"].min(), merged_orders["WAVE"].max()) if orderJoins: for k, v in orderJoins.items(): for panel in [top_panel, middle_panel, bottom_panel]: panel.axvline(v, color="black", linestyle="--", linewidth=0.5, alpha=0.5) panel.text( v + 5, 0.9 * panel.get_ylim()[1], "ORDER JOIN", rotation=90, verticalalignment="top", fontsize=6, color="black", alpha=0.5, ) # PLOT SKY LINES AS VERTICAL LINES ON SKY PANEL for _, row in skylinesDF.iterrows(): for panel in [top_panel, middle_panel, bottom_panel, sky_panel]: panel.axvline( row["WAVELENGTH"], color="blue", linestyle="-", linewidth=0.2, alpha=0.08, zorder=0, label="Sky Line" if panel == top_panel else "", ) if fluxCalibrated: filename = filenameTemplate.replace(".fits", f"_EXTRACTED_MERGED_FLUXCALIBRATED_QC_PLOT{noddingSequence}.pdf") else: filename = filenameTemplate.replace(".fits", f"_EXTRACTED_MERGED_QC_PLOT{noddingSequence}.pdf") filePath = f"{qcDir}/{filename}" if debug: plt.show() plt.savefig(filePath, dpi=120, bbox_inches="tight", format="pdf") plt.close("all") utcnow = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S") products = pd.concat( [ products, pd.Series( { "soxspipe_recipe": recipeName, "product_label": ( f"EXTRACTED_MERGED_FLUXCALIBRATED_QC_PLOT{noddingSequence}" if fluxCalibrated else f"EXTRACTED_MERGED_QC_PLOT{noddingSequence}" ), "file_name": filename, "file_type": "PDF", "obs_date_utc": dateObs, "reduction_date_utc": utcnow, "product_desc": "QC plot of extracted order-merged source", "file_path": filePath, "label": "QC", } ) .to_frame() .T, ], ignore_index=True, ) log.debug("completed the ``plot_merged_spectrum_qc`` function") return products, filePath
[docs] def calculate_rolling_snr(dataframe, flux_column, window_size): """ Calculate the rolling Signal-to-Noise Ratio (SNR) for a given column in a pandas DataFrame. This function computes the rolling SNR for a specified column in the DataFrame using a custom rolling window function. The SNR is calculated as the ratio of the median signal to the noise, where the noise is estimated using a robust statistical method. **Key Arguments:** - `dataframe`: The input pandas DataFrame containing the data. - `flux_column`: The name of the column in the DataFrame for which the rolling SNR will be calculated. - `window_size`: The size of the rolling window to use for the calculation. **Return:** - `dataframe`: The input DataFrame with an additional column 'SNR' containing the calculated rolling SNR values. **Usage:** ```python from soxspipe.commonutils.toolkit import calculate_rolling_snr df_with_snr = calculate_rolling_snr(dataframe=df, flux_column='flux', window_size=5) ``` """ import numpy as np from numpy.lib.stride_tricks import sliding_window_view values = dataframe[flux_column].to_numpy(dtype=np.float64, copy=False) n = values.size snr_full = np.full(n, np.nan, dtype=np.float64) # Need at least 5 points for the noise estimator and enough data for one window if window_size >= 5 and n >= window_size: windows = sliding_window_view(values, window_shape=window_size) # Signal: rolling median signal = np.median(windows, axis=1) # Noise: robust estimator based on 5-point second-difference pattern diff = np.abs(2.0 * windows[:, 2:-2] - windows[:, :-4] - windows[:, 4:]) noise = 0.6052697 * np.median(diff, axis=1) snr = np.divide(signal, noise, out=np.full_like(signal, np.nan), where=noise > 0) # Match center=True placement left = (window_size - 1) // 2 snr_full[left : left + snr.size] = snr dataframe["SNR"] = snr_full dataframe["SNR"] = dataframe["SNR"].replace([np.inf, -np.inf], np.nan).bfill().ffill() return dataframe
[docs] def extinction_correction_factor(wave, extinctionTablePath, airmass): from scipy.interpolate import interp1d import numpy as np import matplotlib.pyplot as plt import pandas as pd from astropy.table import Table # READ THE EXTINCTION CURVE FOR THE OBSERVATORY # DATA IS ORGANIZED AS FOLLOWS: # FIRST COLUMN, WAVELENGTH (IN ANGSTROM), SECOND COLUMN MAG/AIRMASS extinctionData = Table.read(extinctionTablePath, format="fits") extinctionData = extinctionData.to_pandas() # CONVERT ANG TO NM wave_ext = extinctionData["WAVE"] / 10 # INTERPOLATING ON THE REQUIRED WAVE SCALE refitted_ext = interp1d( np.array(wave_ext), np.array(extinctionData["MAG_AIRMASS"]), kind="next", fill_value="extrapolate" ) extCorrectionFactor = 10 ** (0.4 * refitted_ext(wave) * airmass) return extCorrectionFactor
[docs] def frame_to_32(frame): """*convert a given frame to 32-bit float format* **Key Arguments:** - `frame` -- the input frame **Returns:** - `frame` -- the converted frame **Usage:** ```python from soxspipe.commonutils.toolkit import frame_to_32 frame = frame_to_32(frame) ``` """ from astropy.nddata import CCDData import numpy as np try: if frame.data.dtype != np.float32: frame.data = frame.data.astype(np.float32, copy=False) except: pass try: frame.uncertainty.array = frame.uncertainty.array.astype(np.float32, copy=False) except: pass return frame
[docs] def add_snr_efficiency_qcs(log, spectrumDF, qcTable, orderJoins, recipeName, dateObs): """*add quality checks to the qc table* **Key Arguments:** - ``log`` -- logger - ``spectrumDF`` -- the dataframe containing the extracted spectrum with a column named 'SNR' or 'EFFICIENCY' to calculate the checks from. - ``qcTable`` -- the qc table to which the SNR checks will be added - ``orderJoins`` -- a dictionary containing the wavelengths of order joins (if any) to be added as QCs. - ``recipeName`` -- name of the recipe to add to the QC entries - ``dateObs`` -- the observation date to add to the QC entries (in ISO format, e.g., "2024-06-01T00:00:00") **Return:** - ``qcTable`` -- the updated qc table with SNR checks added **Usage:** ```python qcTable = add_snr_qcs(log, spectrumDF, qcTable, orderJoins) ``` """ import numpy as np import pandas as pd from datetime import datetime spectrumDF = spectrumDF.copy(deep=False) spectrumDF["ORDER"] = np.nan try: spectrumDF["WAVE"] = spectrumDF["WAVE"].values.value except: pass ## REVERSE DICTIONARY KEYS SO FIRST KEY IS LAST orderJoins = dict(reversed(list(orderJoins.items()))) for i, (orders, wl) in enumerate(zip(orderJoins.keys(), orderJoins.values())): if len(orders) == 2: visOrders = ["i", "r", "g", "u"] lastOrder = visOrders[int(orders[0])] order = visOrders[int(orders[0]) - 1] elif len(orders) == 3: lastOrder = int(orders[:1]) order = int(orders[1:]) else: order = int(orders[:2]) lastOrder = int(orders[2::]) if i == 0: spectrumDF.loc[(spectrumDF["WAVE"] <= wl), "ORDER"] = lastOrder spectrumDF.loc[ spectrumDF["WAVE"] >= wl, "ORDER", ] = order utcnow = datetime.utcnow() utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S") # CALCULATE THE MEDIAN EFFICIENCY ACROSS ALL ORDERS if "EFFICIENCY" in spectrumDF.columns: medianEfficiency = spectrumDF["EFFICIENCY"].median() qcTable = pd.concat( [ qcTable, pd.Series( { "soxspipe_recipe": recipeName, "qc_name": "EFF MEDIAN", "qc_value": float(f"{medianEfficiency:0.2f}"), "qc_comment": "Median efficiency across all orders", "qc_unit": None, "obs_date_utc": dateObs, "reduction_date_utc": utcnow, "to_header": True, } ) .to_frame() .T, ], ignore_index=True, ) # CALCULATE THE MEDIAN EFFICIENCY IN EACH ORDER orderEfficiency = spectrumDF.groupby("ORDER")["EFFICIENCY"].median().reset_index() orderEfficiency.columns = ["ORDER", "MEDIAN_EFFICIENCY"] for _, row in orderEfficiency.iterrows(): qcTable = pd.concat( [ qcTable, pd.Series( { "soxspipe_recipe": recipeName, "qc_name": f"EFF MEDIAN", "qc_value": float(f"{row['MEDIAN_EFFICIENCY']:0.2f}"), "qc_comment": f"Median efficiency in order {row['ORDER']}", "qc_order": row["ORDER"], "qc_unit": None, "obs_date_utc": dateObs, "reduction_date_utc": utcnow, "to_header": True, } ) .to_frame() .T, ], ignore_index=True, ) if "SNR" in spectrumDF.columns: medianSNR = spectrumDF["SNR"].median() qcTable = pd.concat( [ qcTable, pd.Series( { "soxspipe_recipe": recipeName, "qc_name": "SNR MEDIAN", "qc_value": float(f"{medianSNR:0.2f}"), "qc_comment": "Median SNR across all orders", "qc_unit": None, "obs_date_utc": dateObs, "reduction_date_utc": utcnow, "to_header": True, } ) .to_frame() .T, ], ignore_index=True, ) orderSNR = spectrumDF.groupby("ORDER")["SNR"].median().reset_index() orderSNR.columns = ["ORDER", "MEDIAN_SNR"] for _, row in orderSNR.iterrows(): qcTable = pd.concat( [ qcTable, pd.Series( { "soxspipe_recipe": recipeName, "qc_name": f"SNR MEDIAN", "qc_value": float(f"{row['MEDIAN_SNR']:0.2f}"), "qc_comment": f"Median SNR in order {row['ORDER']}", "qc_order": row["ORDER"], "qc_unit": None, "obs_date_utc": dateObs, "reduction_date_utc": utcnow, "to_header": True, } ) .to_frame() .T, ], ignore_index=True, ) return qcTable