Source code for soxspipe.commonutils.create_dispersion_map

#!/usr/bin/env python
# encoding: utf-8
"""
*detect arc-lines on a pinhole frame to generate a dispersion solution*

Author
: Marco Landoni & David Young

Date Created
: September  1, 2020

Module Structure
: This module contains the main `create_dispersion_map` class and helper functions:

  **Main Class:**
  - `create_dispersion_map` - Generates dispersion solutions from pinhole arc frames

  **Helper Functions:**
  - `measure_line_position()` - Detect and measure arc line positions on image stamps
  - `straighten_mph_sets()` - Straighten multi-pinhole sets by projecting onto fitted line
  - `find_largest_cluster_center()` - Find center of largest cluster using DBSCAN
  - `_plot_slit_index_comparisons()` - Debug visualization of slit position residuals

  **Workflow:**
  1. Load predicted line positions from static calibrations
  2. Detect arc lines on pinhole frame iteratively
  3. Calculate shifts between predicted and observed positions
  4. Fit polynomial dispersion solution (wavelength, order, slit)
  5. Generate QC plots and write output files
"""

################# GLOBAL IMPORTS ####################
from soxspipe.commonutils.toolkit import unpack_order_table, read_spectral_format, twoD_disp_map_image_to_dataframe
from soxspipe.commonutils.dispersion_map_to_pixel_arrays import dispersion_map_to_pixel_arrays
from soxspipe.commonutils.filenamer import filenamer
from soxspipe.commonutils.polynomials import chebyshev_order_wavelength_polynomials
from soxspipe.commonutils.toolkit import get_calibrations_path
from os.path import expanduser
from soxspipe.commonutils import detector_lookup
from soxspipe.commonutils import keyword_lookup
from fundamentals import tools
from builtins import object
import sys
import os
from datetime import datetime, timezone

os.environ["TERM"] = "vt100"


[docs] class create_dispersion_map(object): """ *detect arc-lines on a pinhole frame to generate a dispersion solution* **Key Arguments:** - ``log`` -- logger - ``settings`` -- the settings dictionary - ``recipeSettings`` -- the recipe specific settings - ``pinholeFrame`` -- the calibrated pinhole frame (single or multi) - ``firstGuessMap`` -- the first guess dispersion map from the `soxs_disp_solution` recipe (needed in `soxs_spat_solution` recipe). Default *False*. - ``orderTable`` -- the order geometry table - ``qcTable`` -- the data frame to collect measured QC metrics - ``productsTable`` -- the data frame to collect output products - ``sofName`` -- name of the originating SOF file - ``create2DMap`` -- create the 2D image map of wavelength, slit-position and order from disp solution. - ``lineDetectionTable`` -- the list of arc-lines detected on the pinhole frame (used only for pipeline tuning) - ``startNightDate`` -- YYYY-MM-DD date of the observation night. Default "" - ``arcFrame`` -- the calibrated arc frame used to determine spectral resolution. Default *None* - ``debug`` -- debug mode. Default *False* - ``turnOffMP`` -- turn off multiprocessing. Default *False*. **Usage:** ```python from soxspipe.commonutils import create_dispersion_map mapPath, mapImagePath, res_plots, qcTable, productsTable = create_dispersion_map( log=log, settings=settings, pinholeFrame=frame, firstGuessMap=False, qcTable=self.qc, productsTable=self.products, sofName=sofName, create2DMap=True, recipeSettings=recipeSettings, startNightDate=startNightDate, arcFrame=arcFrame, debug=debug, turnOffMP=turnOffMP ).get() ``` """ def __init__( self, log, settings, recipeSettings, pinholeFrame, firstGuessMap=False, orderTable=False, qcTable=False, productsTable=False, sofName=False, create2DMap=True, lineDetectionTable=False, startNightDate="", arcFrame=None, debug=False, turnOffMP=False, ): self.log = log log.debug("instantiating a new 'create_dispersion_map' object") import warnings import copy # STORE INITIALIZATION PARAMETERS self._store_init_params( settings, recipeSettings, pinholeFrame, firstGuessMap, orderTable, qcTable, productsTable, sofName, create2DMap, lineDetectionTable, startNightDate, arcFrame, debug, copy, turnOffMP, ) # SETUP KEYWORD LOOKUP AND EXTRACT FRAME METADATA self._setup_keywords_and_metadata() # VALIDATE NIR ARM LAMP REQUIREMENTS self._validate_nir_lamp_requirements() # DETERMINE RECIPE NAME BASED ON firstGuessMap FLAG self._set_recipe_name() # INITIALIZE DETECTOR PARAMETERS self._setup_detector_params() # CONFIGURE WARNINGS AND LOGGING LEVELS self._configure_warnings_and_logging(warnings) # SET IMAGE ORIENTATION AXES self._set_image_orientation() # CREATE OUTPUT DIRECTORIES FOR QC AND PRODUCTS self._setup_output_directories() return None def _store_init_params( self, settings, recipeSettings, pinholeFrame, firstGuessMap, orderTable, qcTable, productsTable, sofName, create2DMap, lineDetectionTable, startNightDate, arcFrame, debug, copy, turnOffMP, ): """*Store all initialization parameters as instance variables*""" self.settings = settings self.pinholeFrame = pinholeFrame self.firstGuessMap = firstGuessMap self.orderTable = orderTable self.qc = qcTable self.products = productsTable self.sofName = sofName self.create2DMap = create2DMap self.recipeSettings = copy.deepcopy(recipeSettings) self.lineDetectionTable = lineDetectionTable self.startNightDate = startNightDate self.arcFrame = arcFrame self.debug = debug self.turnOffMP = turnOffMP def _setup_keywords_and_metadata(self): """*Initialize keyword lookup and extract frame header metadata*""" from soxspipe.commonutils.toolkit import get_calibration_lamp # SETUP KEYWORD LOOKUP OBJECT kw = keyword_lookup(log=self.log, settings=self.settings).get self.kw = kw # EXTRACT HEADER METADATA self.arm = self.pinholeFrame.header[kw("SEQ_ARM")] self.dateObs = self.pinholeFrame.header[kw("DATE_OBS")] self.inst = self.pinholeFrame.header[kw("INSTRUME")] self.exptime = self.pinholeFrame.header[kw("EXPTIME")] # DETERMINE CALIBRATION LAMP TYPE self.lamp = get_calibration_lamp(log=self.log, frame=self.pinholeFrame, kw=kw) def _validate_nir_lamp_requirements(self): """*Validate that NIR arm has required Argon and Mercury lamps*""" if self.arm.upper() == "NIR": has_ar = "ar" in self.lamp.lower() has_hg = "hg" in self.lamp.lower() if not (has_ar and has_hg): raise Exception("NIR arm requires both Argon (Ar) and Mercury (Hg) lamps") def _set_recipe_name(self): """*Determine recipe name based on whether this is first guess or spatial solution*""" if self.firstGuessMap: self.recipeName = "soxs-spat-solution" else: self.recipeName = "soxs-disp-solution" def _setup_detector_params(self): """*Initialize detector parameters lookup for the current arm*""" self.detectorParams = detector_lookup(log=self.log, settings=self.settings).get(self.arm) def _configure_warnings_and_logging(self, warnings): """*Configure warning filters and reset logging levels*""" from photutils.utils import NoDetectionsWarning import logging # SUPPRESS PHOTUTILS NO DETECTIONS WARNINGS warnings.simplefilter("ignore", NoDetectionsWarning) # FIX ASTROPY LOGGING LEVEL RESET logging.getLogger().setLevel(logging.INFO + 5) def _set_image_orientation(self): """*Set dispersion and spatial axes based on detector configuration*""" if self.detectorParams["dispersion-axis"] == "x": self.axisA = "x" self.axisB = "y" else: self.axisA = "y" self.axisB = "x" def _setup_output_directories(self): """*Create QC and product output directories*""" from soxspipe.commonutils.toolkit import utility_setup self.qcDir, self.productDir = utility_setup( log=self.log, settings=self.settings, recipeName=self.recipeName, startNightDate=self.startNightDate ) def _initialize_recipe_settings(self): """*INITIALIZE POLYNOMIAL DEGREES AND DETECTION WINDOW SETTINGS*""" # READ BOOTSTRAP AND FITTING PARAMETERS FROM SETTINGS bootstrap_dispersion_solution = self.settings["bootstrap_dispersion_solution"] tightFit = self.settings["bootstrap_dispersion_solution"] orderDeg = self.recipeSettings["order-deg"] wavelengthDeg = self.recipeSettings["wavelength-deg"] self.windowSize = self.recipeSettings["pixel-window-size"] # DETERMINE SLIT POLYNOMIAL DEGREE BASED ON FRAME TYPE if self.firstGuessMap: slitDeg = self.recipeSettings["slit-deg"] else: # SINGLE PINHOLE: NO SLIT VARIATION slitDeg = [0, 0] if isinstance(orderDeg, list) else 0 return bootstrap_dispersion_solution, tightFit, orderDeg, wavelengthDeg, slitDeg def _prepare_pinhole_frame(self): """*MASK FRAME AND CALCULATE STATISTICS FOR LINE DETECTION*""" import numpy as np from astropy.stats import sigma_clipped_stats # CREATE MASKED ARRAY FROM FRAME DATA pinholeFrameMasked = np.ma.array(self.pinholeFrame.data, mask=self.pinholeFrame.mask) # CALCULATE ROBUST STATISTICS FOR BACKGROUND ESTIMATION mean, median, std = sigma_clipped_stats( pinholeFrameMasked, sigma=5.0, stdfunc="mad_std", cenfunc="median", maxiters=3 ) # STORE FOR LATER USE IN LINE DETECTION self.meanFrameFlux = mean self.stdFrameFlux = std self.pinholeFrameMasked = pinholeFrameMasked return pinholeFrameMasked def _generate_quicklook_and_debug_plots(self, orderPixelTable, pinholeFrameMasked): """*GENERATE QUICKLOOK IMAGE AND OPTIONAL DEBUG PLOTS*""" from soxspipe.commonutils.toolkit import quicklook_image # GENERATE QUICKLOOK IMAGE WITH SURFACE PLOT quicklook_image( log=self.log, CCDObject=self.pinholeFrame, show=self.debug, ext=False, stdWindow=3, surfacePlot=True, title="Pinhole Frame", ) # DISPLAY DEBUG PLOT OF PREDICTED LINE WINDOWS IF IN DEBUG MODE if self.debug: self._plot_predicted_line_windows(orderPixelTable, pinholeFrameMasked) def _calculate_position_differences(self, orderPixelTable): """*CALCULATE DIFFERENCES BETWEEN PREDICTED AND OBSERVED POSITIONS*""" import numpy as np # CALCULATE X AND Y DIFFERENCES orderPixelTable["x_diff"] = orderPixelTable["detector_x_shifted"] - orderPixelTable["observed_x"] orderPixelTable["y_diff"] = orderPixelTable["detector_y_shifted"] - orderPixelTable["observed_y"] # CALCULATE EUCLIDEAN DISTANCE orderPixelTable["xy_diff"] = np.sqrt( np.square(orderPixelTable["x_diff"]) + np.square(orderPixelTable["y_diff"]) ) if "mph_mean_x" not in orderPixelTable.columns or "mph_mean_y" not in orderPixelTable.columns: # FOR EACH UNIQUE (wavelength, order), ADD mph_mean_x AND mph_mean_y COLUMNS # GROUP BY (wavelength, order) AND CALCULATE MEAN detector_x AND detector_y means = ( orderPixelTable.groupby(["wavelength", "order"])[["detector_x", "detector_y"]] .mean() .rename(columns={"detector_x": "mph_mean_x", "detector_y": "mph_mean_y"}) ) # MERGE THE MEANS BACK TO THE ORIGINAL DATAFRAME orderPixelTable = orderPixelTable.merge(means, on=["wavelength", "order"], how="left") return orderPixelTable def _get_detection_parameters(self, iteration, tightFit): """*DETERMINE DETECTION PARAMETERS BASED ON ITERATION NUMBER*""" # INITIALIZE DEFAULT PARAMETERS returnAll = False brightest = False exclude_border = True if tightFit: # USE STRICT WINDOW SIZE FOR TIGHT FITTING windowHalf = round(self.windowSize / 2) sigmaLimit = self.recipeSettings["pinhole-detection-thres-sigma"] else: # PROGRESSIVELY REFINE SEARCH WINDOW AND DETECTION THRESHOLD if iteration == 0: # ITERATION 0: WIDE SEARCH TO CATCH ALL POTENTIAL LINES windowHalf = min(round(self.windowSize * 3), 25) sigmaLimit = ( 10 if not self.firstGuessMap else max(self.recipeSettings["pinhole-detection-thres-sigma"], 5) ) returnAll = not self.firstGuessMap elif iteration == 1: # ITERATION 1: INTERMEDIATE REFINEMENT windowHalf = min(round(self.windowSize * 2), 10 if not self.firstGuessMap else 8) sigmaLimit = ( 10 if not self.firstGuessMap else max(self.recipeSettings["pinhole-detection-thres-sigma"], 2) ) returnAll = not self.firstGuessMap else: # ITERATION 2+: FINAL PRECISE SEARCH windowHalf = round(self.windowSize / 2) sigmaLimit = self.recipeSettings["pinhole-detection-thres-sigma"] return windowHalf, sigmaLimit, returnAll, brightest, exclude_border def _explode_multiple_detections(self, orderPixelTable): """*EXPLODE DATAFRAME WHEN MULTIPLE LINES DETECTED FOR SINGLE PREDICTION*""" import pandas as pd # FIND COLUMNS CONTAINING LISTS (MULTIPLE DETECTIONS) exploded_columns = [col for col in orderPixelTable.columns if isinstance(orderPixelTable[col].iloc[0], list)] # EXPLODE LISTS INTO SEPARATE ROWS orderPixelTable = orderPixelTable.explode(exploded_columns, ignore_index=True) # CONVERT STRING VALUES BACK TO NUMERIC for col in exploded_columns: orderPixelTable[col] = pd.to_numeric(orderPixelTable[col], errors="coerce") return orderPixelTable def _write_qc_metrics(self, totalLines, detectedLines, percentageDetectedLines, percentageGoodLines, utcnow): """*WRITE LINE DETECTION QC METRICS TO TABLE*""" import pandas as pd # DETERMINE TAG BASED ON RECIPE TYPE tag = "single" if "DISP" in self.recipeName.upper() else "multi" qc_names = ["DETLINES TOT", "DETLINES NUM", "DETLINES FRAC", "GOODLINES FRAC"] qc_comments = [ f"Total number of line in {tag} line-list", f"Number of lines detected in {tag} pinhole frame", f"Proportion of input line-list lines detected on {tag} pinhole frame", f"Proportion of good, unclipped lines in {tag} pinhole frame", ] qc_values = [totalLines, detectedLines, percentageDetectedLines, percentageGoodLines] qc_units = ["lines", "lines", None, None] for qc_name, qc_value, qc_comment, qc_unit in zip(qc_names, qc_values, qc_comments, qc_units): self.qc = pd.concat( [ self.qc, pd.Series( { "soxspipe_recipe": self.recipeName, "qc_name": qc_name, "qc_value": qc_value, "qc_comment": qc_comment, "qc_unit": qc_unit, "obs_date_utc": self.dateObs, "reduction_date_utc": utcnow, "to_header": True, } ) .to_frame() .T, ], ignore_index=True, ) def _find_cluster_center_with_fallback(self, x_data, y_data, fallback_median): """*FIND CLUSTER CENTER USING DBSCAN WITH AUTOMATIC PARAMETER ADJUSTMENT*""" # START WITH MODERATE CLUSTERING PARAMETERS centre = None eps = 0.7 # PROGRESSIVELY RELAX PARAMETERS IF NO CLUSTER FOUND while not centre and eps < 2.0: min_samples = 25 while not centre and min_samples > 5: centre_x, centre_y = find_largest_cluster_center(x_data, y_data, eps=eps, min_samples=min_samples) if centre_x: centre = (centre_x, centre_y) min_samples -= 1 if not centre: eps += 0.1 # FALLBACK TO MEDIAN IF NO CLUSTER FOUND if not centre: centre = (fallback_median, None) return centre def _calculate_order_shift_statistics(self, orderPixelTable, mask): """*CALCULATE SHIFT STATISTICS FOR A SINGLE ORDER*""" from astropy.stats import sigma_clipped_stats import numpy as np # CALCULATE XY DISTANCE STATISTICS meanxy, medianxy, stdxy = sigma_clipped_stats( orderPixelTable.loc[mask]["xy_diff"], sigma=1.0, stdfunc="std", cenfunc="mean", maxiters=20 ) # CALCULATE X AND Y SHIFT STATISTICS SEPARATELY updatedMask = mask meanx, medianx, stdx = sigma_clipped_stats( orderPixelTable.loc[updatedMask]["x_diff"], sigma=1.5, stdfunc="std", cenfunc="mean", maxiters=7 ) meany, mediany, stdy = sigma_clipped_stats( orderPixelTable.loc[updatedMask]["y_diff"], sigma=1.5, stdfunc="std", cenfunc="mean", maxiters=7 ) return medianx, mediany, stdx, stdy, medianxy, stdxy def _find_and_apply_cluster_shift(self, orderPixelTable, mask): """*FIND CLUSTER CENTER IN SHIFT DISTRIBUTION AND APPLY CORRECTION*""" # FIND CLUSTER CENTER IN SHIFT SPACE centrex = None centrey = None eps = 0.7 orderLen = len(orderPixelTable.loc[mask].index) # PROGRESSIVELY RELAX CLUSTERING PARAMETERS while not centrex and eps < 1.2 and orderLen: min_samples = 25 while not centrex and min_samples > 10: centrex, centrey = find_largest_cluster_center( orderPixelTable.loc[mask]["x_diff"], orderPixelTable.loc[mask]["y_diff"], eps=eps, min_samples=min_samples, ) min_samples -= 1 if not centrex: eps += 0.1 # APPLY SHIFT CORRECTION IF CLUSTER FOUND if centrex and centrey: orderPixelTable.loc[mask, "detector_x_shifted"] = orderPixelTable.loc[mask]["detector_x_shifted"] - centrex orderPixelTable.loc[mask, "detector_y_shifted"] = orderPixelTable.loc[mask]["detector_y_shifted"] - centrey return orderPixelTable, centrex, centrey def _handle_multipin_hole_big_shift(self, orderPixelTable, order_num, mask, iteration): """*DETECT AND CORRECT LARGE SHIFTS BETWEEN SINGLE/MULTI-PINHOLE FRAMES*""" from astropy.stats import sigma_clipped_stats import numpy as np # ONLY CHECK ON FIRST ITERATION if iteration > 1: return orderPixelTable if len(orderPixelTable.loc[(mask)].index) < 50: self.log.error("COULD NOT DETECT ANY PINHOLES. PLEASE CHECK THE RAW FRAMES") raise ValueError("COULD NOT DETECT ANY PINHOLES. PLEASE CHECK THE RAW FRAMES") # CALCULATE MEDIAN SHIFTS FOR TOP AND BOTTOM SLIT POSITIONS _, medTop, _ = sigma_clipped_stats( orderPixelTable.loc[(mask & (orderPixelTable["slit_index"] == 8))][f"{self.axisA}_diff"], sigma=1.0, stdfunc="mad_std", cenfunc="median", maxiters=3, ) _, medBottom, _ = sigma_clipped_stats( orderPixelTable.loc[(mask & (orderPixelTable["slit_index"] == 0))][f"{self.axisA}_diff"], sigma=1.0, stdfunc="mad_std", cenfunc="median", maxiters=3, ) # FIND CLUSTER CENTERS FOR TOP AND BOTTOM SLITS centreTop = self._find_cluster_center_with_fallback( orderPixelTable.loc[(mask & (orderPixelTable["slit_index"] == 8))][f"{self.axisA}_diff"], orderPixelTable.loc[(mask & (orderPixelTable["slit_index"] == 8))][f"{self.axisB}_diff"], medTop, ) centreBottom = self._find_cluster_center_with_fallback( orderPixelTable.loc[(mask & (orderPixelTable["slit_index"] == 0))][f"{self.axisA}_diff"], orderPixelTable.loc[(mask & (orderPixelTable["slit_index"] == 0))][f"{self.axisB}_diff"], medBottom, ) centreATop, centreBTop = centreTop centreABottom, centreBBottom = centreBottom # CHECK IF SHIFT IS SIGNIFICANT (> 0.4 PIXELS) if abs(centreATop - centreABottom) > 0.4: # USE LARGER SHIFT shift = centreATop if abs(centreATop) > abs(centreABottom) else centreABottom # print(f"{order_num} APPLYING BIG SHIFT {shift}") # APPLY CORRECTION orderPixelTable.loc[mask, f"detector_{self.axisA}_shifted"] = ( orderPixelTable.loc[mask][f"detector_{self.axisA}_shifted"] - shift ) # DEBUG PLOTS IF ENABLED if self.debug: print("ERERERE") _plot_slit_index_comparisons(orderPixelTable.loc[(mask & (orderPixelTable["slit_index"] == 8))]) _plot_slit_index_comparisons(orderPixelTable.loc[(mask & (orderPixelTable["slit_index"] == 0))]) elif self.debug: _plot_slit_index_comparisons(orderPixelTable.loc[mask]) return orderPixelTable def _reduce_polynomial_degrees(self, popt_x, popt_y, wavelengthDeg, orderDeg, slitDeg): """*REDUCE POLYNOMIAL DEGREES WHEN FIT FAILS*""" # HANDLE DIFFERENT POLYNOMIAL DEGREE FORMATS if not isinstance(wavelengthDeg, list): # SINGLE DEGREE FOR BOTH AXES degList = [wavelengthDeg, orderDeg, slitDeg] degList[degList.index(max(degList))] -= 1 wavelengthDeg, orderDeg, slitDeg = degList elif popt_x == "xerror": # REDUCE X-AXIS DEGREES degList = [wavelengthDeg[0], orderDeg[0], slitDeg[0]] degList[degList.index(max(degList))] -= 1 wavelengthDeg[0], orderDeg[0], slitDeg[0] = degList elif popt_y == "yerror": # REDUCE Y-AXIS DEGREES degList = [wavelengthDeg[1], orderDeg[1], slitDeg[1]] degList[degList.index(max(degList))] -= 1 wavelengthDeg[1], orderDeg[1], slitDeg[1] = degList # UPDATE RECIPE SETTINGS self.recipeSettings["order-deg"] = orderDeg self.recipeSettings["wavelength-deg"] = wavelengthDeg if self.firstGuessMap: self.recipeSettings["slit-deg"] = slitDeg return wavelengthDeg, orderDeg, slitDeg
[docs] def get(self): """ *generate the dispersion map* **Return:** - ``mapPath`` -- path to the file containing the coefficients of the x,y polynomials of the global dispersion map fit **Workflow:** 1. LOAD PREDICTED LINE POSITIONS FROM CALIBRATION FILES 2. PREPARE PINHOLE FRAME (MASKING, STATISTICS) 3. ITERATIVELY DETECT ARC LINES ON FRAME 4. FIT POLYNOMIAL DISPERSION SOLUTION 5. WRITE OUTPUTS (LINE LISTS, MAP FILES, QC PLOTS) """ self.log.debug("starting the ``get`` method") import pandas as pd from astropy.table import Table import numpy as np from astropy.stats import sigma_clipped_stats from astropy.stats import sigma_clip # STEP 1: INITIALIZE RECIPE SETTINGS AND POLYNOMIAL DEGREES bootstrap_dispersion_solution, tightFit, orderDeg, wavelengthDeg, slitDeg = self._initialize_recipe_settings() # STEP 2: LOAD PREDICTED LINE POSITIONS FROM CALIBRATIONS orderPixelTable = self.get_predicted_line_list() originalOrderPixelTable = orderPixelTable.copy() totalLines = len(orderPixelTable.index) self.uniqueSlitPos = orderPixelTable["slit_position"].unique() # STEP 3: PREPARE PINHOLE FRAME FOR LINE DETECTION pinholeFrameMasked = self._prepare_pinhole_frame() # STEP 4: GENERATE QUICKLOOK IMAGE AND DEBUG PLOTS self._generate_quicklook_and_debug_plots(orderPixelTable, pinholeFrameMasked) boost = True while boost: # SORT BY COLUMN NAME orderPixelTable.sort_values(["wavelength"], inplace=True) # BOOST WILL BE SET TO TRUE LATER IF FOUND TO BE TRUE IN THE SETTINGS FILE boost = False bigShift = False if not isinstance(self.lineDetectionTable, bool): # USE THE PROVIDED LINE DETECTION TABLE (FOR PIPELINE TUNING ONLY) orderPixelTable = self.lineDetectionTable.copy() else: # DETECT THE LINES ON THE PINHOLE FRAME AND # ADD OBSERVED LINES TO DATAFRAME iteration = 0 self.log.print(f"\n# FINDING PINHOLE ARC-LINES ON IMAGE\n") iraf = False while iteration < 3: # GET DETECTION PARAMETERS FOR CURRENT ITERATION self.windowHalf, sigmaLimit, returnAll, brightest, exclude_border = self._get_detection_parameters( iteration, tightFit ) # DETECT LINES ON PINHOLE FRAME orderPixelTable = self.detect_pinhole_arc_lines( orderPixelTable=orderPixelTable, iraf=iraf, sigmaLimit=sigmaLimit, iteration=iteration, brightest=brightest, exclude_border=exclude_border, returnAll=returnAll, ) iteration += 1 # USE IRAF STAR FINDER FOR SUBSEQUENT ITERATIONS iraf = True # INITIALIZE SHIFTED DETECTOR COLUMNS ON FIRST PASS if "detector_x_shifted" not in orderPixelTable.columns: orderPixelTable["detector_x_shifted"] = orderPixelTable["detector_x"] orderPixelTable["detector_y_shifted"] = orderPixelTable["detector_y"] # EXPLODE MULTIPLE DETECTIONS INTO SEPARATE ROWS orderPixelTable = self._explode_multiple_detections(orderPixelTable) # CALCULATE POSITION DIFFERENCES BETWEEN PREDICTED AND OBSERVED orderPixelTable = self._calculate_position_differences(orderPixelTable) # STEP 5: APPLY SHIFT CORRECTIONS ORDER-BY-ORDER if self.arm.upper() == "VIS" and self.inst.upper() == "SOXS": # VIS ARM OF SOXS: GROUP BY SHIFT GROUPS orderPixelTable["shift_group"] = orderPixelTable["order"] else: # NIR ARM OF SOXS OR OTHER INSTRUMENTS # ASSIGN SHIFT GROUP BASED ON NxN GRID OF mph_mean_x AND mph_mean_y # SET GRID SIZE (N=3 MEANS 3x3 GRID) grid_size = 3 x = orderPixelTable["mph_mean_x"] y = orderPixelTable["mph_mean_y"] # COMPUTE BIN EDGES x_bins = np.linspace(x.min(), x.max(), grid_size + 1) y_bins = np.linspace(y.min(), y.max(), grid_size + 1) # DIGITIZE TO GET GRID INDICES (1 to grid_size) x_idx = np.digitize(x, x_bins, right=False) y_idx = np.digitize(y, y_bins, right=False) # CLIP TO 1-grid_size (IN CASE OF EDGE CASES) x_idx = np.clip(x_idx, 1, grid_size) y_idx = np.clip(y_idx, 1, grid_size) # GRID CELL NUMBER: (row-major, 1 to grid_size^2) orderPixelTable["shift_group"] = (y_idx - 1) * grid_size + x_idx shiftGroups = orderPixelTable["shift_group"].unique() for sg in shiftGroups: mask = (orderPixelTable["shift_group"] == sg) & (orderPixelTable["observed_x"].notnull()) # CHECK FOR LARGE SHIFTS IN MULTI-PINHOLE FRAMES if self.firstGuessMap and bigShift is False: orderPixelTable = self._handle_multipin_hole_big_shift(orderPixelTable, sg, mask, iteration) continue # FIND AND APPLY CLUSTER-BASED SHIFT CORRECTION orderPixelTable, centrex, centrey = self._find_and_apply_cluster_shift(orderPixelTable, mask) # CALCULATE SHIFT STATISTICS FOR QC REPORTING medianx, mediany, stdx, stdy, medianxy, stdxy = self._calculate_order_shift_statistics( orderPixelTable, mask ) # PRINT DEBUG INFORMATION IF ENABLED if self.debug: print(f"# SHIFT GROUP {sg}") print(f"StdXY: {stdxy:.3f}, MedianXY: {medianxy:.3f}") print(f"StdX: {stdx:.3f}, StdY: {stdy:.3f}") print(f"MedianX: {medianx:.3f}, MedianY: {mediany:.3f}") if centrex: print(f"centrex: {centrex:.3f}, centrey: {centrey:.3f}") else: print("no centre found") print(f"Length shift group {sg}:", len(orderPixelTable.loc[mask].index)) _plot_slit_index_comparisons(orderPixelTable.loc[mask]) print() # SKIP ORDER IF NO VALID LINES FOUND if np.isnan(medianx) or np.isnan(mediany): self.log.warning(f"Could not find any arc lines in shift group {sg}.") continue # PRINT PROGRESS UPDATE sys.stdout.flush() # sys.stdout.write("\x1b[1A\x1b[2K") self.log.print( f"\t ITERATION {iteration}: Median X Y difference between predicted " f"and measured positions: {medianx:0.5f},{mediany:0.5f} " f"(stdx,stdy = {stdx:0.2f},{stdy:0.2f}) (shift group {sg})" ) # RETURN TO DISCRETE PREDICTED LINE POSITIONS BY REMOVING DUPLICATE DETECTOR_X/Y VALUES orderPixelTable.drop_duplicates( subset=["detector_x", "detector_y"], keep="first", inplace=True, ignore_index=True ) bigShift = True # MAKE A CLEAN COPY OF THE DETECTION TABLE ... USED FOR PIPELINE TUNING ONLY lineDetectionTable = orderPixelTable.copy() # COLLECT MISSING LINES mask = orderPixelTable["observed_x"].isnull() missingLines = orderPixelTable.loc[mask] # GROUP RESULTS BY WAVELENGTH lineGroups = missingLines.groupby(["wavelength", "order"]) lineGroups = lineGroups.size().to_frame(name="count").reset_index() # CREATE THE LIST OF INCOMPLETE MULTIPINHOLE WAVELENGTHS & ORDER SETS TO DROP orderPixelTable["dropped"] = False if "SPAT" in self.recipeName.upper(): missingLineThreshold = 9 - self.recipeSettings["mph_line_set_min"] mask = lineGroups["count"] > missingLineThreshold lineGroups = lineGroups.loc[mask] setsToDrop = lineGroups[["wavelength", "order"]] s = orderPixelTable[["wavelength", "order"]].merge(setsToDrop, indicator=True, how="left") s["dropped"] = False s.loc[(s["_merge"] == "both"), "dropped"] = True orderPixelTable["droppedOnMissing"] = s["dropped"].values orderPixelTable.loc[(orderPixelTable["droppedOnMissing"] == True), "dropped"] = True # DROP MISSING VALUES orderPixelTable.dropna(axis="index", how="any", subset=["observed_x"], inplace=True) detectedLines = len(orderPixelTable.index) if self.firstGuessMap and False: orderPixelTable = ( orderPixelTable.groupby(["wavelength", "order"]).apply(straighten_mph_sets).reset_index(drop=True) ) # CALCULATE LINE DETECTION STATISTICS percentageDetectedLines = float("{:.6f}".format(float(detectedLines) / float(totalLines))) # GET CURRENT UTC TIMESTAMP FOR QC RECORDS utcnow = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") # GROUP FOUND LINES INTO SETS AND CLIP ON MEAN XY SHIFT RESIDUALS if True: orderPixelTable = self._clip_on_measured_line_metrics(orderPixelTable) else: orderPixelTable["fwhm_pin_px"] = 3.0 # STEP 6: ITERATIVELY FIT POLYNOMIAL SOLUTIONS WITH AUTOMATIC DEGREE REDUCTION fitFound = False tryCount = 0 while not fitFound and tryCount < 5: # ATTEMPT TO FIT POLYNOMIAL DISPERSION SOLUTION popt_x, popt_y, goodLinesTable, clippedLinesTable = self.fit_polynomials( orderPixelTable=orderPixelTable, wavelengthDeg=wavelengthDeg, orderDeg=orderDeg, slitDeg=slitDeg, missingLines=missingLines, ) # CHECK IF FIT CONVERGED SUCCESSFULLY if isinstance(popt_x, np.ndarray) and isinstance(popt_y, np.ndarray): fitFound = True else: # IN TUNING MODE, FAIL IMMEDIATELY WITHOUT RETRY if self.settings["tune-pipeline"]: raise ArithmeticError( "Could not converge on a good fit to the dispersion solution. " "Please check the quality of your data or adjust your fitting parameters." ) # REDUCE POLYNOMIAL DEGREES AND RETRY wavelengthDeg, orderDeg, slitDeg = self._reduce_polynomial_degrees( popt_x, popt_y, wavelengthDeg, orderDeg, slitDeg ) self.log.print( f"Wavelength, Order and Slit fitting orders reduced to " f"{wavelengthDeg}, {orderDeg}, {slitDeg} to try and achieve a dispersion solution." ) tryCount += 1 if tryCount == 5: self.log.error( "Could not converge on a good fit to the dispersion solution after 5 attempts. " "Please check the quality of your data or adjust your fitting parameters." ) raise ArithmeticError( "Could not converge on a good fit to the dispersion solution. " "Please check the quality of your data or adjust your fitting parameters." ) if bootstrap_dispersion_solution: # WRITE THE MAP TO FILE mapPath = self.write_map_to_file(popt_x, popt_y, orderDeg, wavelengthDeg, slitDeg) # orderPixelTable = self.update_static_line_list_detector_positions(originalOrderPixelTable, mapPath) orderPixelTable = self.create_new_static_line_list(dispersionMapPath=mapPath) boost = True bootstrap_dispersion_solution = False # STEP 7: GENERATE OUTPUT FILENAMES goodLinesFN, missingLinesFN = self._get_output_filenames() # CALCULATE GOOD LINES PERCENTAGE STATISTICS percentageGoodLines = float("{:.6f}".format(float(len(goodLinesTable.index)) / float(totalLines))) # WRITE LINE DETECTION QC METRICS self._write_qc_metrics(totalLines, detectedLines, percentageDetectedLines, percentageGoodLines, utcnow) # STEP 8: PREPARE LINE LISTS WITH PROPER COLUMNS AND QC METRICS self._write_line_list_qc(clippedLinesTable, utcnow) goodAndClippedLines, goodLinesTable = self._prepare_line_list_columns(goodLinesTable, clippedLinesTable) # STEP 9: WRITE FITTED LINES TO FILE self._write_fitted_lines_file(goodAndClippedLines, goodLinesFN, utcnow) # STEP 10: WRITE MISSING LINES TO FILE self._write_missing_lines_file(missingLines, missingLinesFN, utcnow) if self.firstGuessMap: detectedLineGroups = goodLinesTable.groupby(["wavelength", "order"]).size().reset_index(name="count") modeCounts = ( detectedLineGroups.groupby("order")["count"] .agg(lambda x: x.mode().iloc[0] if not x.mode().empty else 0) .reset_index(name="pinhole count") ) if self.inst.upper() == "SOXS": # REMOVE ORDER = 10 modeCounts = modeCounts[~modeCounts["order"].isin([10, 11, 12])] self.minpin = modeCounts["pinhole count"].min() self.qc = pd.concat( [ self.qc, pd.Series( { "soxspipe_recipe": self.recipeName, "qc_name": "PINHOLE COUNT MIN", "qc_value": self.minpin, "qc_comment": "[pinholes] Minimum number of pinholes detected in any order", "qc_unit": "pinholes", "obs_date_utc": self.dateObs, "reduction_date_utc": utcnow, "to_header": True, } ) .to_frame() .T, ], ignore_index=True, ) # WRITE THE MAP TO FILE if not self.settings["tune-pipeline"]: mapPath = self.write_map_to_file(popt_x, popt_y, orderDeg, wavelengthDeg, slitDeg) else: mapPath = None self.create2DMap = False if self.firstGuessMap and self.orderTable and self.create2DMap and self.minpin == 9: mapImagePath = self.map_to_image(dispersionMapPath=mapPath, orders=list(goodLinesTable["order"].unique())) res_plots = self._create_dispersion_map_qc_plot( xcoeff=popt_x, ycoeff=popt_y, orderDeg=orderDeg, wavelengthDeg=wavelengthDeg, slitDeg=slitDeg, orderPixelTable=goodLinesTable, missingLines=missingLines, allClippedLines=clippedLinesTable, dispMap=mapPath, dispMapImage=mapImagePath, ) # if self.CLINE > 160 and self.arm == "VIS": # raise ValueError(f"Total number of clipped lines {self.CLINE} is above threshold.") # elif self.CLINE > 2000 and self.arm == "NIR": # raise ValueError(f"Total number of clipped lines {self.CLINE} is above threshold.") return mapPath, mapImagePath, res_plots, self.qc, self.products, lineDetectionTable res_plots = self._create_dispersion_map_qc_plot( xcoeff=popt_x, ycoeff=popt_y, orderDeg=orderDeg, wavelengthDeg=wavelengthDeg, slitDeg=slitDeg, orderPixelTable=goodLinesTable, missingLines=missingLines, allClippedLines=clippedLinesTable, ) self.log.debug("completed the ``get`` method") return mapPath, None, res_plots, self.qc, self.products, lineDetectionTable
[docs] def get_predicted_line_list(self): """*lift the predicted line list from the static calibrations* **Return:** - ``orderPixelTable`` -- a panda's data-frame containing wavelength,order,slit_index,slit_position,detector_x,detector_y """ self.log.debug("starting the ``get_predicted_line_list`` method") from astropy.table import Table from astropy.stats import sigma_clipped_stats import numpy as np # DETERMINE FRAME TYPE (SINGLE OR MULTI-PINHOLE) frameTech = self._determine_frame_tech() # GET BINNING PARAMETERS binx, biny = self._get_binning_params() # LOAD PREDICTED LINE LIST FROM CALIBRATIONS orderPixelTable = self._load_predicted_lines(frameTech, binx, biny) # CLEAN AND PREPARE LINE LIST orderPixelTable = self._clean_line_list(orderPixelTable) # APPLY COORDINATE TRANSFORMATIONS orderPixelTable = self._apply_coordinate_transforms(orderPixelTable) # FILTER TO SPECIFIC SLIT INDEX IF NOT FIRST GUESS MAP if not self.firstGuessMap: orderPixelTable = self._filter_to_mid_slit(orderPixelTable) # APPLY FIRST GUESS CORRECTIONS IF AVAILABLE if self.firstGuessMap: orderPixelTable = self._apply_first_guess_corrections(orderPixelTable) self.log.debug("completed the ``get_predicted_line_list`` method") return orderPixelTable
def _determine_frame_tech(self): """*Determine if frame is single or multi-pinhole*""" kw = self.kw tech = self.pinholeFrame.header[kw("DPR_TECH")] if tech == "ECHELLE,PINHOLE": return "single" elif tech == "ECHELLE,MULTI-PINHOLE": return "multi" else: raise TypeError("The input frame needs to be a calibrated single- or multi-pinhole arc lamp frame") def _get_binning_params(self): """*Extract binning parameters from frame header*""" kw = self.kw if self.arm != "NIR" and kw("WIN_BINX") in self.pinholeFrame.header: binx = int(self.pinholeFrame.header[kw("WIN_BINX")]) biny = int(self.pinholeFrame.header[kw("WIN_BINY")]) else: binx, biny = 1, 1 return binx, biny def _load_predicted_lines(self, frameTech, binx, biny): """*Load predicted line list from calibration files*""" from astropy.table import Table calibrationPath = get_calibrations_path(log=self.log, settings=self.settings) predictedFile = self.detectorParams["predicted pinhole lines"][frameTech][f"{binx}x{biny}"] fullPath = f"{calibrationPath}/{predictedFile}" # LOAD AND CONVERT TO PANDAS dat = Table.read(fullPath, format="fits") return dat.to_pandas() def _clean_line_list(self, df): """*Clean and standardize line list dataframe*""" # STANDARDIZE COLUMN NAMES df.columns = [c.lower() if c.lower() in ["order", "wavelength"] else c for c in df.columns] # SET COLUMN DTYPES floatCols = ["wavelength", "slit_position", "detector_x", "detector_y", "order"] intCols = ["slit_index"] stringCols = ["ion"] for col in floatCols: if col in df.columns: try: df[col] = df[col].astype(float) except: pass for col in intCols: if col in df.columns: try: df[col] = df[col].astype(int) except: pass for col in stringCols: if col in df.columns: try: df[col] = df[col].astype(str) except: pass # REMOVE FLAGGED LINES if "delete" in df.columns: mask = df["delete"] == 1 df.drop(index=df[mask].index, inplace=True) # REMOVE DUPLICATES df.drop_duplicates(inplace=True) return df def _apply_coordinate_transforms(self, df): """*Apply coordinate system transformations (FITS to Python indexing)*""" # PHOTUTILS: BOTTOM LEFT PIXEL CENTER IS (0,0) # FITS WCS: BOTTOM LEFT PIXEL CENTER IS (1,1) if self.inst != "SOXS": df["detector_x"] -= 1.0 df["detector_y"] -= 1.0 elif self.arm in ["NIR", "VIS"]: # CALCULATE SCIENCE PIXEL DIMENSIONS FOR SOXS dp = self.detectorParams science_pixels = dp["science-pixels"] xlen = science_pixels["columns"]["end"] - science_pixels["columns"]["start"] ylen = science_pixels["rows"]["end"] - science_pixels["rows"]["start"] return df def _filter_to_mid_slit(self, df): """*Filter line list to only include mid-slit position*""" slitIndex = int(self.detectorParams["mid_slit_index"]) mask = df["slit_index"] == slitIndex return df.loc[mask] def _apply_first_guess_corrections(self, df): """*Apply systematic shifts from first guess dispersion solution*""" from astropy.stats import sigma_clipped_stats import numpy as np slitIndex = int(self.detectorParams["mid_slit_index"]) # GET PREDICTED PIXEL VALUES FROM FIRST GUESS MAP df = dispersion_map_to_pixel_arrays(log=self.log, dispersionMapPath=self.firstGuessMap, orderPixelTable=df) df["detector_x_shifted"] = df["detector_x"] df["detector_y_shifted"] = df["detector_y"] # CALCULATE SHIFTS BETWEEN PREDICTED AND ACTUAL POSITIONS tmpList = df.copy() mask = tmpList["slit_index"] == slitIndex tmpList.loc[mask, "shift_x"] = tmpList.loc[mask, "detector_x"].values - tmpList.loc[mask, "fit_x"].values tmpList.loc[mask, "shift_y"] = tmpList.loc[mask, "detector_y"].values - tmpList.loc[mask, "fit_y"].values tmpList.loc[mask, "shift_xy"] = np.sqrt( tmpList.loc[mask, "shift_x"].values ** 2 + tmpList.loc[mask, "shift_y"].values ** 2 ) # EXTRACT SHIFT COLUMNS tmpList = tmpList.loc[tmpList["shift_x"].notnull(), ["wavelength", "order", "shift_x", "shift_y", "shift_xy"]] # CALCULATE MEDIAN SHIFTS PER ORDER, REPLACING OUTLIERS uniqueOrders = df["order"].unique() for o in uniqueOrders: mask = tmpList["order"] == o meanxy, medianxy, stdxy = sigma_clipped_stats( tmpList.loc[mask, "shift_xy"], sigma=3.0, stdfunc="mad_std", cenfunc="median", maxiters=3 ) meanx, medianx, stdx = sigma_clipped_stats( tmpList.loc[mask, "shift_x"], sigma=5.0, stdfunc="mad_std", cenfunc="median", maxiters=3 ) meany, mediany, stdy = sigma_clipped_stats( tmpList.loc[mask, "shift_y"], sigma=5.0, stdfunc="mad_std", cenfunc="median", maxiters=3 ) # USE MEDIAN FOR ORDERS WITH HIGH SCATTER if stdxy > 1.5: tmpList.loc[mask, "shift_y"] = mediany tmpList.loc[mask, "shift_x"] = medianx print(o, stdxy, "high scatter - using median shift values") # MERGE SHIFTS BACK INTO MAIN DATAFRAME df = df.merge(tmpList, on=["wavelength", "order"], how="outer") # DROP ROWS WITHOUT VALID SHIFTS df.dropna(axis="index", how="any", subset=["shift_x"], inplace=True) # APPLY SHIFTS TO DETECTOR POSITIONS df.loc[:, "detector_x_shifted"] -= df.loc[:, "shift_x"] df.loc[:, "detector_y_shifted"] -= df.loc[:, "shift_y"] # DROP TEMPORARY SHIFT COLUMNS df.drop(columns=["fit_x", "fit_y", "shift_x", "shift_y"], inplace=True) # REMOVE INCOMPLETE MULTI-PINHOLE SETS df = self._remove_incomplete_mph_sets(df) return df def _remove_incomplete_mph_sets(self, df): """*Remove multi-pinhole line sets that don't contain all slit positions*""" # GROUP BY WAVELENGTH AND ORDER lineGroups = df.groupby(["wavelength", "order"]).size().to_frame(name="count").reset_index() fullSet = lineGroups["count"].max() # IDENTIFY INCOMPLETE SETS mask = lineGroups["count"] < fullSet setsToDrop = lineGroups.loc[mask, ["wavelength", "order"]] # MERGE AND FLAG INCOMPLETE SETS s = df[["wavelength", "order"]].merge(setsToDrop, indicator=True, how="left") s["dropped"] = s["_merge"] == "both" df["droppedOnMissing"] = s["dropped"].values # FILTER OUT DROPPED SETS df = df.loc[df["droppedOnMissing"] == False] df.drop(columns=["droppedOnMissing"], inplace=True) return df def _plot_predicted_line_windows(self, orderPixelTable, pinholeFrameMasked): """*Plot predicted line positions with detection windows overlaid*""" import matplotlib.pyplot as plt from matplotlib.patches import Rectangle fig, ax = plt.subplots() ax.imshow( pinholeFrameMasked, cmap="gray", origin="lower", vmin=self.meanFrameFlux, vmax=self.meanFrameFlux + 5 * self.stdFrameFlux, ) # OVERLAY DETECTION WINDOWS for index, row in orderPixelTable.iterrows(): x, y = row["detector_x"], row["detector_y"] ax.add_patch( Rectangle( (x - self.windowSize / 2, y - self.windowSize / 2), self.windowSize, self.windowSize, fill=None, edgecolor="red", ) ) plt.show() def _get_output_filenames(self): """*Generate output filenames for line lists*""" if not self.sofName: filename = filenamer(log=self.log, frame=self.pinholeFrame, settings=self.settings) goodLinesFN = filename.replace(".fits", "_FITTED_LINES.fits") missingLinesFN = filename.replace(".fits", "_MISSED_LINES.fits") else: goodLinesFN = self.sofName + "_FITTED_LINES.fits" missingLinesFN = self.sofName + "_MISSED_LINES.fits" return goodLinesFN, missingLinesFN def _write_line_list_qc(self, clippedLinesTable, utcnow): """*Write QC metric for number of clipped lines*""" import pandas as pd self.CLINE = len(clippedLinesTable.index) self.qc = pd.concat( [ self.qc, pd.Series( { "soxspipe_recipe": self.recipeName, "qc_name": "DETLINES CLIP NUM", "qc_value": self.CLINE, "qc_comment": "Total number of detected lines clipped during solution fitting", "qc_unit": "lines", "obs_date_utc": self.dateObs, "reduction_date_utc": utcnow, "to_header": True, } ) .to_frame() .T, ], ignore_index=True, ) def _prepare_line_list_columns(self, goodLinesTable, clippedLinesTable): """*Prepare and combine good and clipped line lists with proper columns*""" import pandas as pd import numpy as np # DEFINE COLUMNS TO KEEP keepColumns = [ "wavelength", "order", "slit_index", "slit_position", "detector_x", "detector_y", "observed_x", "observed_y", "x_diff", "y_diff", "fit_x", "fit_y", "residuals_x", "residuals_y", "residuals_xy", "sigma_clipped", "sharpness", "roundness1", "roundness2", "npix", "sky", "peak", "flux", "fwhm_pin_px", "R_pin", "pixelScaleNm", "detector_x_shifted", "detector_y_shifted", "R_slit", "fwhm_slit_px", ] # ADD ION COLUMN IF PRESENT if "ion" in goodLinesTable.columns: keepColumns.insert(0, "ion") # MARK CLIPPED LINES clippedLinesTable["sigma_clipped"] = True clippedLinesTable["R"] = np.nan clippedLinesTable["pixelScaleNm"] = np.nan # COMBINE GOOD AND CLIPPED LINES if len(clippedLinesTable.index): goodAndClippedLines = pd.concat( [clippedLinesTable[keepColumns], goodLinesTable[keepColumns]], ignore_index=True ) else: goodAndClippedLines = goodLinesTable[keepColumns] # SORT BOTH DATAFRAMES goodAndClippedLines.sort_values(["order", "wavelength", "slit_index"], inplace=True) goodLinesTable = goodLinesTable[keepColumns] goodLinesTable.sort_values(["order", "wavelength", "slit_index"], inplace=True) return goodAndClippedLines, goodLinesTable def _write_fitted_lines_file(self, goodAndClippedLines, goodLinesFN, utcnow): """*Write fitted lines (good + clipped) to FITS file*""" from astropy.table import Table import pandas as pd t = Table.from_pandas(goodAndClippedLines) filePath = f"{self.qcDir}/{goodLinesFN}" if not self.settings["tune-pipeline"]: t.write(filePath, overwrite=True) self.products = pd.concat( [ self.products, pd.Series( { "soxspipe_recipe": self.recipeName, "product_label": "DISP_MAP_LINES", "file_name": goodLinesFN, "file_type": "FITS", "obs_date_utc": self.dateObs, "reduction_date_utc": utcnow, "product_desc": f"{self.arm} dispersion solution fitted lines", "file_path": filePath, "label": "QC", } ) .to_frame() .T, ], ignore_index=True, ) def _write_missing_lines_file(self, missingLines, missingLinesFN, utcnow): """*Write missing lines to FITS file*""" from astropy.table import Table import pandas as pd keepColumns = ["wavelength", "order", "slit_index", "slit_position", "detector_x", "detector_y"] # SORT AND EXTRACT RELEVANT COLUMNS missingLines.sort_values(["order", "wavelength", "slit_index"], inplace=True) t = Table.from_pandas(missingLines[keepColumns]) filePath = f"{self.qcDir}/{missingLinesFN}" if not self.settings["tune-pipeline"]: t.write(filePath, overwrite=True) self.products = pd.concat( [ self.products, pd.Series( { "soxspipe_recipe": self.recipeName, "product_label": "DISP_MAP_LINES_MISSING", "file_name": missingLinesFN, "file_type": "FITS", "obs_date_utc": self.dateObs, "reduction_date_utc": utcnow, "product_desc": f"{self.arm} undetected arc lines", "file_path": filePath, "label": "QC", } ) .to_frame() .T, ], ignore_index=True, )
[docs] def detect_pinhole_arc_lines( self, orderPixelTable, iraf=True, sigmaLimit=3, iteration=False, brightest=False, exclude_border=False, returnAll=False, ): """*detect the observed position of an arc-line given the predicted pixel positions* **Key Arguments:** - ``orderPixelTable`` -- the initial line list - ``iraf`` -- use IRAF star finder to generate a FWHM - ``sigmaLimit`` -- the lower sigma limit for arc line to be considered detected - ``iteration`` -- which detect and shift iteration are we on? - ``brightest`` -- find the brightest source? - ``exclude_border`` -- exclude border pixels from detection? **Return:** - ``predictedLine`` -- the line with the observed pixel coordinates appended (if detected, otherwise nan) """ self.log.debug("starting the ``detect_pinhole_arc_lines`` method") import numpy as np from fundamentals import fmultiprocess import logging # FIX ASTROPY LOGGING LEVEL RESET ISSUE logging.getLogger().setLevel(logging.INFO + 5) # GET FRAME AND WINDOW PARAMETERS pinholeFrame = self.pinholeFrameMasked windowHalf = self.windowHalf # USE SHIFTED POSITIONS IF AVAILABLE, OTHERWISE USE ORIGINAL PREDICTIONS if "detector_x_shifted" in orderPixelTable.columns: xArray = orderPixelTable["detector_x_shifted"] yArray = orderPixelTable["detector_y_shifted"] else: xArray = orderPixelTable["detector_x"] yArray = orderPixelTable["detector_y"] # CALCULATE STAMP BOUNDARIES AROUND EACH PREDICTED POSITION # ENSURE BOUNDARIES DON'T EXCEED FRAME DIMENSIONS xlows = np.clip(np.round(xArray - windowHalf).astype(int), 0, None) xups = np.clip(np.round(xArray + windowHalf).astype(int), None, pinholeFrame.shape[1]) ylows = np.clip(np.round(yArray - windowHalf).astype(int), 0, None) yups = np.clip(np.round(yArray + windowHalf).astype(int), None, pinholeFrame.shape[0]) # CREATE IMAGE STAMPS (CUTOUTS) FOR EACH PREDICTED LINE POSITION stamps = [ (pinholeFrame[ylow:yup, xlow:xup], xlow, xup, ylow, yup) for xlow, xup, ylow, yup in zip(xlows, xups, ylows, yups) ] # INITIALIZE OUTPUT DICTIONARY FOR LINE MEASUREMENTS predictedLines = { "sharpness": [], # POINT SOURCE SHARPNESS METRIC "roundness1": [], # ROUNDNESS METRIC 1 "roundness2": [], # ROUNDNESS METRIC 2 "npix": [], # NUMBER OF PIXELS IN DETECTION "sky": [], # LOCAL SKY BACKGROUND LEVEL "peak": [], # PEAK PIXEL VALUE "flux": [], # INTEGRATED FLUX "observed_x": [], # MEASURED X POSITION "observed_y": [], # MEASURED Y POSITION "fwhm_pin_px": [], # MEASURED FWHM IN PIXELS } if self.debug or self.turnOffMP: turnOffMP = True else: turnOffMP = False # PROCESS ALL STAMPS USING MULTIPROCESSING (OR SERIAL IN DEBUG MODE) # NOTE: MULTIPROCESSING AND THREADING IS MUCH SLOWER! KEEP TURNED OFF results = fmultiprocess( log=self.log, function=measure_line_position, inputArray=stamps, poolSize=False, timeout=300, mute=False, progressBar=False, turnOffMP=True, # DISABLE MULTIPROCESSING IN DEBUG MODE windowHalf=self.windowHalf, iraf=iraf, sigmaLimit=sigmaLimit, brightest=brightest, exclude_border=exclude_border, iteration=iteration, returnAll=returnAll, debug=self.debug, ) # AGGREGATE RESULTS FROM ALL STAMP MEASUREMENTS for rr in results: for k in predictedLines.keys(): thisList = [r[k] for r in rr] predictedLines[k].append(thisList) print() # ADD MEASUREMENT RESULTS TO ORDER PIXEL TABLE for k, v in predictedLines.items(): orderPixelTable[k] = v self.log.debug("completed the ``detect_pinhole_arc_lines`` method") return orderPixelTable
[docs] def write_map_to_file(self, xcoeff, ycoeff, orderDeg, wavelengthDeg, slitDeg): """*write out the fitted polynomial solution coefficients to file* **Key Arguments:** - ``xcoeff`` -- the x-coefficients - ``ycoeff`` -- the y-coefficients - ``orderDeg`` -- degree of the order fitting - ``wavelengthDeg`` -- degree of wavelength fitting - ``slitDeg`` -- degree of the slit fitting (False for single pinhole) **Return:** - ``disp_map_path`` -- path to the saved file """ self.log.debug("starting the ``write_map_to_file`` method") import pandas as pd from astropy.table import Table from astropy.io import fits from contextlib import suppress import copy import math import numpy as np arm = self.arm kw = self.kw # ORGANIZE X-AXIS POLYNOMIAL COEFFICIENTS INTO DICTIONARY # HANDLE BOTH SINGLE AND PER-AXIS POLYNOMIAL DEGREES if isinstance(orderDeg, list): coeff_dict_x = {} coeff_dict_x["axis"] = "x" coeff_dict_x["order_deg"] = orderDeg[0] coeff_dict_x["wavelength_deg"] = wavelengthDeg[0] coeff_dict_x["slit_deg"] = slitDeg[0] n_coeff = 0 for i in range(0, orderDeg[0] + 1): for j in range(0, wavelengthDeg[0] + 1): for k in range(0, slitDeg[0] + 1): coeff_dict_x[f"c{i}{j}{k}"] = xcoeff[n_coeff] n_coeff += 1 else: coeff_dict_x = {} coeff_dict_x["axis"] = "x" coeff_dict_x["order_deg"] = orderDeg coeff_dict_x["wavelength_deg"] = wavelengthDeg coeff_dict_x["slit_deg"] = slitDeg n_coeff = 0 for i in range(0, orderDeg + 1): for j in range(0, wavelengthDeg + 1): for k in range(0, slitDeg + 1): coeff_dict_x[f"c{i}{j}{k}"] = xcoeff[n_coeff] n_coeff += 1 # ORGANIZE Y-AXIS POLYNOMIAL COEFFICIENTS INTO DICTIONARY # HANDLE BOTH SINGLE AND PER-AXIS POLYNOMIAL DEGREES if isinstance(orderDeg, list): coeff_dict_y = {} coeff_dict_y["axis"] = "y" coeff_dict_y["order_deg"] = orderDeg[1] coeff_dict_y["wavelength_deg"] = wavelengthDeg[1] coeff_dict_y["slit_deg"] = slitDeg[1] n_coeff = 0 for i in range(0, orderDeg[1] + 1): for j in range(0, wavelengthDeg[1] + 1): for k in range(0, slitDeg[1] + 1): coeff_dict_y[f"c{i}{j}{k}"] = ycoeff[n_coeff] n_coeff += 1 else: coeff_dict_y = {} coeff_dict_y["axis"] = "y" coeff_dict_y["order_deg"] = orderDeg coeff_dict_y["wavelength_deg"] = wavelengthDeg coeff_dict_y["slit_deg"] = slitDeg n_coeff = 0 for i in range(0, orderDeg + 1): for j in range(0, wavelengthDeg + 1): for k in range(0, slitDeg + 1): coeff_dict_y[f"c{i}{j}{k}"] = ycoeff[n_coeff] n_coeff += 1 # GENERATE OUTPUT FILENAME FROM FRAME OR SOF NAME if not self.sofName: filename = filenamer(log=self.log, frame=self.pinholeFrame, settings=self.settings) else: filename = self.sofName + ".fits" # PREPARE HEADER BY COPYING FRAME HEADER AND REMOVING UNWANTED KEYWORDS header = copy.deepcopy(self.pinholeFrame.header) header.pop(kw("DPR_TECH")) header.pop(kw("DPR_CATG")) header.pop(kw("DPR_TYPE")) # REMOVE DETECTOR-SPECIFIC KEYWORDS (MAY NOT EXIST IN ALL HEADERS) with suppress(KeyError): header.pop(kw("DET_READ_SPEED")) with suppress(KeyError, LookupError): header.pop(kw("CONAD")) with suppress(KeyError): header.pop(kw("GAIN")) with suppress(KeyError): header.pop(kw("RON")) # SET PROCESSING TECHNIQUE BASED ON FRAME TYPE if slitDeg == 0 or slitDeg == [0, 0]: # SINGLE PINHOLE FRAME header[kw("PRO_TECH").upper()] = "ECHELLE,PINHOLE" else: # MULTI-PINHOLE FRAME header[kw("PRO_TECH").upper()] = "ECHELLE,MULTI-PINHOLE" filePath = f"{self.productDir}/{filename}" # CREATE BINARY TABLE WITH COEFFICIENT DICTIONARIES df = pd.DataFrame([coeff_dict_x, coeff_dict_y]) t = Table.from_pandas(df) # SORT COLUMNS: METADATA FIRST, THEN COEFFICIENTS ALPHABETICALLY cols = list(t.columns) startList = ["axis", "order_deg", "wavelength_deg", "slit_deg"] cols = [c for c in cols if c not in startList] cols.sort() cols = startList + cols t = t[cols] # CONVERT TO FITS BINARY TABLE HDU BinTableHDU = fits.table_to_hdu(t) # UPDATE HEADER WITH STANDARD PROCESSING KEYWORDS header[kw("SEQ_ARM").upper()] = arm header[kw("PRO_TYPE").upper()] = "REDUCED" header[kw("PRO_CATG").upper()] = f"DISP_TAB_{arm}".upper() self.dispMapHeader = header # ADD QC METRICS TO HEADER for n, v, c, h in zip( self.qc["qc_name"].values, self.qc["qc_value"].values, self.qc["qc_comment"].values, self.qc["to_header"].values, ): if h and v is not np.nan: header[f"ESO QC {n}".upper()] = (v, c) # CREATE PRIMARY HDU AND WRITE FITS FILE priHDU = fits.PrimaryHDU(header=header) hduList = fits.HDUList([priHDU, BinTableHDU]) hduList.verify("fix") hduList.writeto(filePath, checksum=True, overwrite=True) if False: # MAKE RELATIVE HOME PATH ABSOLUTE from os.path import expanduser home = expanduser("~") cache = self.settings["workspace-root-dir"].replace("~", home) + "/.cache" # Recursively create missing directories if not os.path.exists(cache): os.makedirs(cache) polyOrders = [orderDeg, wavelengthDeg, slitDeg] if isinstance(orderDeg, list): merged_list = [] for sublist in polyOrders: merged_list.extend(sublist) polyOrders = merged_list polyOrders[:] = [str(l) for l in polyOrders] polyOrders = "".join(polyOrders) filename = f"{self.recipeName}_{self.arm}_{polyOrders}.fits" cacheFilePath = f"{cache}/{filename}" hduList.verify("fix") hduList.writeto(cacheFilePath, checksum=True, overwrite=True) self.log.debug("completed the ``write_map_to_file`` method") return filePath
[docs] def calculate_residuals( self, orderPixelTable, xcoeff, ycoeff, orderDeg, wavelengthDeg, slitDeg, writeQCs=False, pixelRange=False ): """*calculate residuals of the polynomial fits against the observed line positions* **Key Arguments:** - ``orderPixelTable`` -- the predicted line list as a data frame - ``xcoeff`` -- the x-coefficients - ``ycoeff`` -- the y-coefficients - ``orderDeg`` -- degree of the order fitting - ``wavelengthDeg`` -- degree of wavelength fitting - ``slitDeg`` -- degree of the slit fitting (False for single pinhole) - ``writeQCs`` -- write the QCs to dataframe? Default *False* - ``pixelRange`` -- return centre pixel *and* +- 2nm from the centre pixel (to measure the pixel scale) **Return:** - ``residuals`` -- combined x-y residuals - ``mean`` -- the mean of the combine residuals - ``std`` -- the stdev of the combine residuals - ``median`` -- the median of the combine residuals """ self.log.debug("starting the ``calculate_residuals`` method") import numpy as np import pandas as pd arm = self.arm utcnow = datetime.now(timezone.utc) utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S") # ADD EXPONENTS TO ORDERTABLE UP-FRONT if isinstance(orderDeg, list): orderDegx = orderDeg[0] orderDegy = orderDeg[1] wavelengthDegx = wavelengthDeg[0] wavelengthDegy = wavelengthDeg[1] slitDegx = slitDeg[0] slitDegy = slitDeg[1] else: orderDegx = orderDeg orderDegy = orderDeg wavelengthDegx = wavelengthDeg wavelengthDegy = wavelengthDeg slitDegx = slitDeg slitDegy = slitDeg polyx = chebyshev_order_wavelength_polynomials( log=self.log, orderDeg=orderDegx, wavelengthDeg=wavelengthDegx, slitDeg=slitDegx, exponentsIncluded=True, axis="x", ).poly polyy = chebyshev_order_wavelength_polynomials( log=self.log, orderDeg=orderDegy, wavelengthDeg=wavelengthDegy, slitDeg=slitDegy, exponentsIncluded=True, axis="y", ).poly # CALCULATE X & Y RESIDUALS BETWEEN OBSERVED LINE POSITIONS AND POLY # FITTED POSITIONS orderPixelTable["fit_x"] = polyx(orderPixelTable, *xcoeff) orderPixelTable["fit_y"] = polyy(orderPixelTable, *ycoeff) if pixelRange == True: polyx = chebyshev_order_wavelength_polynomials( log=self.log, orderDeg=orderDegx, wavelengthDeg=wavelengthDegx, slitDeg=slitDegx, exponentsIncluded=False, axis="x", ).poly polyy = chebyshev_order_wavelength_polynomials( log=self.log, orderDeg=orderDegy, wavelengthDeg=wavelengthDegy, slitDeg=slitDegy, exponentsIncluded=False, axis="y", ).poly # GET THE PIXEL SCALE orderPixelTableHigh = orderPixelTable.copy() nmRange = 4.0 orderPixelTableHigh["wavelength"] = orderPixelTableHigh["wavelength"] + nmRange / 2.0 orderPixelTableHigh["fit_x"] = polyx(orderPixelTableHigh, *xcoeff) orderPixelTableHigh["fit_y"] = polyy(orderPixelTableHigh, *ycoeff) orderPixelTableLow = orderPixelTable.copy() orderPixelTableLow["wavelength"] = orderPixelTableLow["wavelength"] - nmRange / 2.0 orderPixelTableLow["fit_x"] = polyx(orderPixelTableLow, *xcoeff) orderPixelTableLow["fit_y"] = polyy(orderPixelTableLow, *ycoeff) orderPixelTable["fit_x_high"] = orderPixelTableHigh["fit_x"] orderPixelTable["fit_y_high"] = orderPixelTableHigh["fit_y"] orderPixelTable["fit_x_low"] = orderPixelTableLow["fit_x"] orderPixelTable["fit_y_low"] = orderPixelTableLow["fit_y"] orderPixelTable["pixelScaleNm"] = nmRange / np.power( np.power(orderPixelTable["fit_x_high"] - orderPixelTable["fit_x_low"], 2) + np.power(orderPixelTable["fit_y_high"] - orderPixelTable["fit_y_low"], 2), 0.5, ) orderPixelTable["delta_wavelength"] = orderPixelTable["pixelScaleNm"] * orderPixelTable["fwhm_pin_px"] orderPixelTable["R_pin"] = orderPixelTable["wavelength"] / orderPixelTable["delta_wavelength"] # REMOVE COLUMN FROM DATA FRAME orderPixelTable.drop(columns=["fit_x_high", "fit_y_high", "fit_x_low", "fit_y_low"], inplace=True) orderPixelTable["residuals_x"] = orderPixelTable["fit_x"] - orderPixelTable["observed_x"] orderPixelTable["residuals_y"] = orderPixelTable["fit_y"] - orderPixelTable["observed_y"] # CALCULATE COMBINED RESIDUALS AND STATS orderPixelTable["residuals_xy"] = np.sqrt( np.square(orderPixelTable["residuals_x"]) + np.square(orderPixelTable["residuals_y"]) ) combined_res_mean = np.mean(orderPixelTable["residuals_xy"]) combined_res_std = np.std(orderPixelTable["residuals_xy"]) combined_res_median = np.median(orderPixelTable["residuals_xy"]) if self.arcFrame: self.slitWidth = self.arcFrame.header[self.kw(f"SLIT_{arm}")].replace("SLIT", "").split("x")[0] orderPixelTable[["R_slit", "fwhm_slit_px"]] = orderPixelTable.apply( self._calculate_resolution_on_slit, axis=1 ) else: orderPixelTable["R_slit"] = 0.0 orderPixelTable["fwhm_slit_px"] = 0.0 if writeQCs: absx = abs(orderPixelTable["residuals_x"]) absy = abs(orderPixelTable["residuals_y"]) fwhm_med = orderPixelTable["fwhm_pin_px"].median() resolution_med = orderPixelTable["R_pin"].median() fwhm_SD = orderPixelTable["fwhm_pin_px"].std() resolution_SD = orderPixelTable["R_pin"].std() orderPixelTable["x_diff"] = orderPixelTable["detector_x"] - orderPixelTable["observed_x"] orderPixelTable["y_diff"] = orderPixelTable["detector_y"] - orderPixelTable["observed_y"] orderPixelTable["xy_diff"] = np.sqrt( np.square(orderPixelTable["x_diff"]) + np.square(orderPixelTable["y_diff"]) ) qc_names = [ "X RES MIN", "X RES MEDIAN", "X RES MAX", "X RES SD", "Y RES MIN", "Y RES MEDIAN", "Y RES MAX", "Y RES SD", "XY RES MIN", "XY RES MEDIAN", "XY RES MAX", "XY RES SD", "X DIFF MEDIAN", "Y DIFF MEDIAN", "XY DIFF MEDIAN", "X DIFF SD", "Y DIFF SD", "XY DIFF SD", "FWHM PIN MEDIAN", "FWHM PIN SD", "R PIN MEDIAN", "R PIN SD", ] qc_order = [None] * len(qc_names) qc_values = [ absx.min(), np.median(absx), absx.max(), absx.std(), absy.min(), np.median(absy), absy.max(), absy.std(), orderPixelTable["residuals_xy"].min(), np.median(orderPixelTable["residuals_xy"]), orderPixelTable["residuals_xy"].max(), orderPixelTable["residuals_xy"].std(), orderPixelTable["x_diff"].abs().median(), orderPixelTable["y_diff"].abs().median(), orderPixelTable["xy_diff"].median(), orderPixelTable["x_diff"].abs().std(), orderPixelTable["y_diff"].abs().std(), orderPixelTable["xy_diff"].std(), fwhm_med, fwhm_SD, resolution_med, resolution_SD, ] qc_units = ["pixels"] * 20 + [""] * 2 qc_comments = [ "[px] Minimum residual in dispersion solution fit along x-axis", "[px] Median residual in dispersion solution fit along x-axis", "[px] Maximum residual in dispersion solution fit along x-axis", "[px] Std-dev of residual in dispersion solution fit along x-axis", "[px] Minimum residual in dispersion solution fit along y-axis", "[px] Median residual in dispersion solution fit along y-axis", "[px] Maximum residual in dispersion solution fit along y-axis", "[px] Std-dev of residual in dispersion solution fit along y-axis", "[px] Minimum residual in dispersion solution fit (XY combined)", "[px] Median residual in dispersion solution fit (XY combined)", "[px] Maximum residual in dispersion solution fit (XY combined)", "[px] Std-dev of residual in dispersion solution (XY combined)", "[px] Median absolute difference between predicted and observed line positions along x-axis", "[px] Median absolute difference between predicted and observed line positions along y-axis", "[px] Median difference between predicted and observed line positions (XY combined)", "[px] Std-dev of difference between predicted and observed line positions along x-axis", "[px] Std-dev of difference between predicted and observed line positions along y-axis", "[px] Std-dev of difference between predicted and observed line positions (XY combined)", "[px] Median FWHM of detected lines in pinhole frames", "[px] Std-dev FWHM of detected lines in pinhole frames", "Median spectral resolution measured from detected lines in pinhole frames", "Std-dev spectral resolution measured from detected lines in pinhole frames", ] if len(qc_units) != len(qc_names) or len(qc_comments) != len(qc_names) or len(qc_values) != len(qc_names): raise ValueError("Mismatch in lengths of QC arrays") uniqueOrders = orderPixelTable["order"].unique() for o in uniqueOrders: thisOrder = orderPixelTable.loc[orderPixelTable["order"] == o] absx_order = abs(thisOrder["residuals_x"]) absy_order = abs(thisOrder["residuals_y"]) fwhm_order = thisOrder["fwhm_pin_px"].median() resolution_order = thisOrder["R_pin"].median() fwhm_order_sd = thisOrder["fwhm_pin_px"].std() resolution_order_sd = thisOrder["R_pin"].std() xdiff = thisOrder["x_diff"].median() ydiff = thisOrder["y_diff"].median() xydiff = np.median(np.sqrt(np.square(xdiff) + np.square(ydiff))) xdiff_sd = thisOrder["x_diff"].std() ydiff_sd = thisOrder["y_diff"].std() xydiff_sd = np.sqrt(np.square(xdiff_sd) + np.square(ydiff_sd)).std() o = int(o) if self.inst == "SOXS" and self.arm == "VIS": for n, oo in zip(["u", "g", "r", "i"], [2, 3, 1, 4]): if o == oo: ostr = n odb = n break else: ostr = f"O{o}" odb = o arrayOfNames = [ f"X RES MEDIAN", f"Y RES MEDIAN", f"XY RES MEDIAN", f"X RES SD", f"Y RES SD", f"XY RES SD", f"X DIFF MEDIAN", f"Y DIFF MEDIAN", f"XY DIFF MEDIAN", f"X DIFF SD", f"Y DIFF SD", f"XY DIFF SD", f"FWHM PIN MEDIAN", f"FWHM PIN SD", f"R PIN MEDIAN", f"R PIN SD", ] qc_names.extend(arrayOfNames) qc_order.extend([odb] * len(arrayOfNames)) qc_values.extend( [ np.median(absx_order), np.median(absy_order), np.median(thisOrder["residuals_xy"]), absx_order.std(), absy_order.std(), thisOrder["residuals_xy"].std(), xdiff, ydiff, xydiff, xdiff_sd, ydiff_sd, xydiff_sd, fwhm_order, fwhm_order_sd, resolution_order, resolution_order_sd, ] ) qc_units.extend(["pixels"] * 14 + [""] * 2) qc_comments.extend( [ f"[px] Median residual in dispersion solution fit along x-axis for order {o}", f"[px] Median residual in dispersion solution fit along y-axis for order {o}", f"[px] Median residual in dispersion solution fit (XY combined) for order {o}", f"[px] Std-dev of residual in dispersion solution fit along x-axis for order {o}", f"[px] Std-dev of residual in dispersion solution fit along y-axis for order {o}", f"[px] Std-dev of residual in dispersion solution (XY combined) for order {o}", f"[px] Median difference between predicted and observed line positions along x-axis for order {o}", f"[px] Median difference between predicted and observed line positions along y-axis for order {o}", f"[px] Median difference between predicted and observed line positions (XY combined) for order {o}", f"[px] Std-dev of difference between predicted and observed line positions along x-axis for order {o}", f"[px] Std-dev of difference between predicted and observed line positions along y-axis for order {o}", f"[px] Std-dev of difference between predicted and observed line positions (XY combined) for order {o}", f"[px] Median FWHM of detected lines in pinhole frames for order {o}", f"[px] Std-dev of FWHM of detected lines in pinhole frames for order {o}", f"Median spectral resolution measured from detected lines in pinhole frames for order {o}", f"Std-dev of spectral resolution measured from detected lines in pinhole frames for order {o}", ] ) if ( len(qc_units) != len(qc_names) or len(qc_comments) != len(qc_names) or len(qc_values) != len(qc_names) or len(qc_order) != len(qc_names) or len(qc_order) != len(qc_values) ): raise ValueError("Mismatch in lengths of QC arrays") for name, value, unit, comment, order in zip(qc_names, qc_values, qc_units, qc_comments, qc_order): if unit != "lines": value = f"{value:0.3f}" self.qc = pd.concat( [ self.qc, pd.Series( { "soxspipe_recipe": self.recipeName, "qc_name": name, "qc_value": value, "qc_order": order, "qc_comment": comment, "qc_unit": unit, "obs_date_utc": self.dateObs, "reduction_date_utc": utcnow, "to_header": True, } ) .to_frame() .T, ], ignore_index=True, ) self.log.debug("completed the ``calculate_residuals`` method") return combined_res_mean, combined_res_std, combined_res_median, orderPixelTable
def _calculate_resolution_on_slit(self, row): import math from astropy.modeling import models, fitting import numpy as np import matplotlib.pyplot as plt import pandas as pd stdToFwhm = 2 * (2 * math.log(2)) ** 0.5 # GOOD GUESS AT STD AND SLICE SIZE if self.arm == "NIR": guessStd = (float(self.slitWidth) / 0.25) / stdToFwhm else: guessStd = (float(self.slitWidth) / 0.29) / stdToFwhm sliceSize = int(guessStd * 5.0) if self.detectorParams["dispersion-axis"] == "y": detector_row = self.arcFrame.data[ int(row["observed_y"]), int(row["observed_x"]) - sliceSize : int(row["observed_x"]) + sliceSize ] detector_row = detector_row - np.median(detector_row) detector_row = np.where(detector_row < 0, 0, detector_row) else: detector_row = self.arcFrame.data[ int(row["observed_y"]) - sliceSize : int(row["observed_y"]) + sliceSize, int(row["observed_x"]) ] detector_row = detector_row - np.median(detector_row) detector_row = np.where(detector_row < 0, 0, detector_row) # FITTING THE LINE WITH A GAUSSIAN PROFILE g_init = models.Gaussian1D(amplitude=1.0, mean=0, stddev=guessStd) fit_g = fitting.LevMarLSQFitter() try: g = fit_g(g_init, np.arange(0, len(detector_row)), detector_row) if g.stddev.value < guessStd / 2.0: # 1.0 SLIT CANNOT BE LESS THAN 1.0 px raise Exception # stddev_corrected = np.sqrt(g.stddev.value*g.stddev.value - np.abs(13*np.sin(row['tilt'])*np.sin(row['tilt']))) stddev_corrected = float(g.stddev.value) fwhm = stddev_corrected * stdToFwhm import random random_number = random.randint(1, 5) print("COME BACK HERE AND TRASH SOME LINES") if False and random_number == 2: gaussx = np.arange(0, len(detector_row), 0.05) plt.plot(np.arange(0, len(detector_row)), detector_row) stddev_corrected * stdToFwhm plt.plot(gaussx, g(gaussx), label=f"STD = {g.stddev.value:0.2f}, FWHM = {fwhm:0.2f}") plt.legend() plt.show() except Exception as e: return pd.Series([None, None]) # stddev_corrected = np.sqrt(g.stddev.value*g.stddev.value - np.abs(13*np.sin(row['tilt'])*np.sin(row['tilt']))) stddev_corrected = float(g.stddev.value) # ENFORCING RESONABLE VALUES EXCLUDING OUTLIERS if stddev_corrected < guessStd / 2.0 or stddev_corrected > guessStd * 2.0: return pd.Series([None, None]) delta_wavelength = row["pixelScaleNm"] * fwhm resolution_line = row["wavelength"] / delta_wavelength return pd.Series([resolution_line, fwhm])
[docs] def fit_polynomials(self, orderPixelTable, wavelengthDeg, orderDeg, slitDeg, missingLines=False): """*iteratively fit the dispersion map polynomials to the data, clipping residuals with each iteration* **Key Arguments:** - ``orderPixelTable`` -- data frame containing order, wavelengths, slit positions and observed pixel positions - ``wavelengthDeg`` -- degree of wavelength fitting - ``orderDeg`` -- degree of the order fitting - ``slitDeg`` -- degree of the slit fitting (0 for single pinhole) - ``missingLines`` -- lines not detected on the image **Return:** - ``xcoeff`` -- the x-coefficients post clipping - ``ycoeff`` -- the y-coefficients post clipping - ``goodLinesTable`` -- the fitted line-list with metrics - ``clippedLinesTable`` -- the lines that were sigma-clipped during polynomial fitting """ self.log.debug("starting the ``fit_polynomials`` method") # FIRST REMOVE DROPPED LINES FILTERED ROWS FROM DATA FRAME allClippedLines = [] mask = orderPixelTable["dropped"] == True allClippedLines.append(orderPixelTable.loc[mask]) orderPixelTable = orderPixelTable.loc[~mask] import numpy as np from astropy.stats import sigma_clip from scipy.optimize import curve_fit import pandas as pd from soxspipe.commonutils import get_cached_coeffs arm = self.arm dp = self.detectorParams clippedCount = 1 # ADD EXPONENTS TO ORDERTABLE UP-FRONT if isinstance(orderDeg, list): orderDegx = orderDeg[0] orderDegy = orderDeg[1] wavelengthDegx = wavelengthDeg[0] wavelengthDegy = wavelengthDeg[1] slitDegx = slitDeg[0] slitDegy = slitDeg[1] else: orderDegx = orderDeg orderDegy = orderDeg wavelengthDegx = wavelengthDeg wavelengthDegy = wavelengthDeg slitDegx = slitDeg slitDegy = slitDeg for i in range(0, orderDegx + 1): orderPixelTable[f"order_pow_x_{i}"] = orderPixelTable["order"].pow(i) for j in range(0, wavelengthDegx + 1): orderPixelTable[f"wavelength_pow_x_{j}"] = orderPixelTable["wavelength"].pow(j) for k in range(0, slitDegx + 1): orderPixelTable[f"slit_position_pow_x_{k}"] = orderPixelTable["slit_position"].pow(k) for i in range(0, orderDegy + 1): orderPixelTable[f"order_pow_y_{i}"] = orderPixelTable["order"].pow(i) for j in range(0, wavelengthDegy + 1): orderPixelTable[f"wavelength_pow_y_{j}"] = orderPixelTable["wavelength"].pow(j) for k in range(0, slitDegy + 1): orderPixelTable[f"slit_position_pow_y_{k}"] = orderPixelTable["slit_position"].pow(k) polyx = chebyshev_order_wavelength_polynomials( log=self.log, orderDeg=orderDegx, wavelengthDeg=wavelengthDegx, slitDeg=slitDegx, exponentsIncluded=True, axis="x", ).poly polyy = chebyshev_order_wavelength_polynomials( log=self.log, orderDeg=orderDegy, wavelengthDeg=wavelengthDegy, slitDeg=slitDegy, exponentsIncluded=True, axis="y", ).poly clippingSigma = self.recipeSettings["poly-fitting-residual-clipping-sigma"] clippingSigmaX = clippingSigma clippingSigmaY = clippingSigma clippingIterationLimit = self.recipeSettings["poly-clipping-iteration-limit"] if "poly-clipping-pinhole-sets" in self.recipeSettings and self.recipeSettings["poly-clipping-pinhole-sets"]: clipOnMphSets = True else: clipOnMphSets = False self.log.print("\n# FINDING DISPERSION SOLUTION") iteration = 0 # CREATE A QUICK FIRST GUESS AT COEFFS ... SPEEDS UP FIRST ITERATION OF FULL SET tmpDF = orderPixelTable.copy() mean_res = 100.0 # orderPixelTable["observed_x"]=orderPixelTable["tilt_corrected_x"] # orderPixelTable["observed_y"]=orderPixelTable["tilt_corrected_y"] orderPixelTable["sigma_clipped"] = False while clippedCount > 0 and iteration < clippingIterationLimit: iteration += 1 observed_x = orderPixelTable["observed_x"].to_numpy() observed_y = orderPixelTable["observed_y"].to_numpy() # IF mean_res > 10 WE WANT TO START FROM SCRATCH AGAIN SO NOT TO INFLUENCE THE FINAL RESULT if True or mean_res > 10: # FIND CACHED COEFF ELSE RETURN ARRAYS OF 1s xcoeff, ycoeff = get_cached_coeffs( log=self.log, arm=arm, settings=self.settings, recipeName=self.recipeName, orderDeg=orderDeg, wavelengthDeg=wavelengthDeg, slitDeg=slitDeg, reset=True, ) # USE LEAST-SQUARED CURVE FIT TO FIT CHEBY POLYS # FIRST X self.log.info("""curvefit x""" % locals()) try: xcoeff, pcov_x = curve_fit(polyx, xdata=orderPixelTable, ydata=observed_x, p0=xcoeff, maxfev=30000) except: return "xerror", None, None, None # NOW Y self.log.info("""curvefit y""" % locals()) try: ycoeff, pcov_y = curve_fit(polyy, xdata=orderPixelTable, ydata=observed_y, p0=ycoeff, maxfev=30000) except: return None, "yerror", None, None self.log.info("""calculate_residuals""" % locals()) mean_res, std_res, median_res, orderPixelTable = self.calculate_residuals( orderPixelTable=orderPixelTable, xcoeff=xcoeff, ycoeff=ycoeff, orderDeg=orderDeg, wavelengthDeg=wavelengthDeg, slitDeg=slitDeg, pixelRange=True, writeQCs=False, ) # DO SOME CLIPPING ON THE PROFILES OF THE DETECTED LINES if iteration == -1: orderPixelTable = self._clip_on_measured_line_metrics(orderPixelTable) # COUNT THE CLIPPED LINES mask = orderPixelTable["sigma_clipped"] == True allClippedLines.append(orderPixelTable.loc[mask]) mask = orderPixelTable["sigma_clipped"] == True orderPixelTable = orderPixelTable.loc[~mask] # SIGMA-CLIP THE DATA self.log.info("""sigma_clip""" % locals()) if clipOnMphSets: columnsNoStrings = list(orderPixelTable.columns) try: columnsNoStrings.remove("ion") except: pass # GROUP BY ARC LINES (MPH SETS) lineGroups = orderPixelTable[columnsNoStrings].groupby(["wavelength", "order"]).mean() lineGroups = lineGroups.reset_index() # SIGMA-CLIP THE DATA ON SCATTER masked_residuals = sigma_clip( lineGroups["residuals_x"].abs(), sigma_lower=3000, sigma_upper=clippingSigmaX, maxiters=1, cenfunc="mean", stdfunc="std", ) lineGroups["sigma_clipped_x"] = masked_residuals.mask masked_residuals = sigma_clip( lineGroups["residuals_y"].abs(), sigma_lower=3000, sigma_upper=clippingSigmaY, maxiters=1, cenfunc="mean", stdfunc="std", ) lineGroups["sigma_clipped_y"] = masked_residuals.mask lineGroups.loc[ ((lineGroups["sigma_clipped_y"] == True) | (lineGroups["sigma_clipped_x"] == True)), "sigma_clipped" ] = True if True: # CLIP ALSO ON COMBINED RESIDUALS masked_residuals = sigma_clip( lineGroups["residuals_xy"], sigma_lower=5000, sigma_upper=clippingSigma, maxiters=1, cenfunc="mean", stdfunc="std", ) lineGroups["sigma_clipped_xy"] = masked_residuals.mask lineGroups.loc[ ( (lineGroups["sigma_clipped_y"] == True) | (lineGroups["sigma_clipped_x"] == True) | (lineGroups["sigma_clipped_xy"] == True) ), "sigma_clipped", ] = True # REMOVE THE CLIPPED DATA BEFORE CLIPPING ON FLUX mask = lineGroups["sigma_clipped"] == True clippedGroups = lineGroups.loc[mask] clippedGroups = clippedGroups[["wavelength", "order"]] s = orderPixelTable[["wavelength", "order"]].merge(clippedGroups, indicator=True, how="left") s["clipped"] = False s.loc[(s["_merge"] == "both"), "clipped"] = True orderPixelTable["sigma_clipped"] = s["clipped"].values # CLIP THE MOST DEVIATE SINGLE PINHOLES if iteration == 1 and True: masked_residuals = sigma_clip( orderPixelTable["residuals_x"].abs(), sigma_lower=3000, sigma_upper=clippingSigmaX * 3, maxiters=1, cenfunc="mean", stdfunc="std", ) orderPixelTable["sigma_clipped_x"] = masked_residuals.mask masked_residuals = sigma_clip( orderPixelTable["residuals_y"].abs(), sigma_lower=3000, sigma_upper=clippingSigmaY * 3, maxiters=1, cenfunc="mean", stdfunc="std", ) orderPixelTable["sigma_clipped_y"] = masked_residuals.mask masked_residuals = sigma_clip( orderPixelTable["R_pin"], sigma_lower=3000, sigma_upper=10, maxiters=1, cenfunc="mean", stdfunc="std", ) orderPixelTable["sigma_clipped_R"] = masked_residuals.mask orderPixelTable.loc[ ( (orderPixelTable["sigma_clipped_y"] == True) | (orderPixelTable["sigma_clipped_x"] == True) | (orderPixelTable["sigma_clipped_R"] == True) ), "sigma_clipped", ] = True else: masked_residuals = sigma_clip( orderPixelTable["residuals_x"].abs(), sigma_lower=3000, sigma_upper=clippingSigmaX * 1, maxiters=3, cenfunc="mean", stdfunc="std", ) orderPixelTable["sigma_clipped_x"] = masked_residuals.mask masked_residuals = sigma_clip( orderPixelTable["residuals_y"].abs(), sigma_lower=3000, sigma_upper=clippingSigmaY * 1, maxiters=3, cenfunc="mean", stdfunc="std", ) orderPixelTable["sigma_clipped_y"] = masked_residuals.mask orderPixelTable.loc[ ((orderPixelTable["sigma_clipped_y"] == True) | (orderPixelTable["sigma_clipped_x"] == True)), "sigma_clipped", ] = True if True: # CLIP ALSO ON COMBINED RESIDUALS masked_residuals = sigma_clip( orderPixelTable["residuals_xy"], sigma_lower=5000, sigma_upper=clippingSigma, maxiters=1, cenfunc="mean", stdfunc="std", ) orderPixelTable["sigma_clipped_xy"] = masked_residuals.mask try: orderPixelTable.loc[ ( (orderPixelTable["sigma_clipped_y"] == True) | (orderPixelTable["sigma_clipped_x"] == True) | (orderPixelTable["sigma_clipped_xy"] == True) ), "sigma_clipped", ] = True except: orderPixelTable.loc[(orderPixelTable["sigma_clipped_xy"] == True), "sigma_clipped"] = True # COUNT THE CLIPPED LINES mask = orderPixelTable["sigma_clipped"] == True allClippedLines.append(orderPixelTable.loc[mask]) totalAllClippedLines = pd.concat(allClippedLines, ignore_index=True) # RETURN BREAKDOWN OF COLUMN VALUE COUNT valCounts = orderPixelTable["sigma_clipped"].value_counts(normalize=False) if True in valCounts: clippedCount = valCounts[True] else: clippedCount = 0 if iteration > 1: # Cursor up one line and clear line sys.stdout.flush() sys.stdout.write("\x1b[1A\x1b[2K") try: self.log.print( f"\tITERATION {iteration:02d}: {clippedCount} arc lines where clipped in this iteration of fitting a global dispersion map" ) except: pass mask = orderPixelTable["sigma_clipped"] == True orderPixelTable = orderPixelTable.loc[~mask] if len(allClippedLines): allClippedLines = pd.concat(allClippedLines, ignore_index=True) mean_res, std_res, median_res, orderPixelTable = self.calculate_residuals( orderPixelTable=orderPixelTable, xcoeff=xcoeff, ycoeff=ycoeff, orderDeg=orderDeg, wavelengthDeg=wavelengthDeg, slitDeg=slitDeg, writeQCs=True, pixelRange=True, ) self.log.debug("completed the ``fit_polynomials`` method") return xcoeff, ycoeff, orderPixelTable, allClippedLines
[docs] def create_placeholder_images(self, order=False, plot=False, reverse=False): """*create CCDData objects as placeholders to host the 2D images of the wavelength and spatial solutions from dispersion solution map* **Key Arguments:** - ``order`` -- specific order to generate the placeholder pixels for. Inner-order pixels set to nan, else set to 0. Default *False* (generate all orders) - ``plot`` -- generate plots of placeholder images (for debugging). Default *False*. - ``reverse`` -- Inner-order pixels set to 0, else set to nan (reverse of default output). **Return:** - ``slitMap`` -- placeholder image to add pixel slit positions to - ``wlMap`` -- placeholder image to add pixel wavelength values to **Usage:** ```python slitMap, wlMap, orderMap = self._create_placeholder_images(order=order) ``` """ self.log.debug("starting the ``create_placeholder_images`` method") import numpy as np from astropy.nddata import CCDData kw = self.kw dp = self.detectorParams # UNPACK THE ORDER TABLE orderPolyTable, orderPixelTable, orderMetaTable = unpack_order_table( log=self.log, orderTablePath=self.orderTable, extend=0.0, order=order ) # CREATE THE IMAGE SAME SIZE AS DETECTOR - NAN INSIDE ORDERS, 0 OUTSIDE science_pixels = dp["science-pixels"] xlen = science_pixels["columns"]["end"] - science_pixels["columns"]["start"] ylen = science_pixels["rows"]["end"] - science_pixels["rows"]["start"] xlen, ylen if reverse: seedArray = np.empty((ylen, xlen)) seedArray[:] = np.nan else: seedArray = np.zeros((ylen, xlen)) wlMap = CCDData(seedArray, unit="adu") orderMap = wlMap.copy() uniqueOrders = orderPixelTable["order"].unique() expandEdges = 0 if self.detectorParams["dispersion-axis"] == "x": axisALen = wlMap.data.shape[1] axisBLen = wlMap.data.shape[0] else: axisALen = wlMap.data.shape[0] axisBLen = wlMap.data.shape[1] for o in uniqueOrders: if order and o != order: continue axisBcoord = orderPixelTable.loc[(orderPixelTable["order"] == o)][f"{self.axisB}coord"] axisACoord_edgeup = ( orderPixelTable.loc[(orderPixelTable["order"] == o)][f"{self.axisA}coord_edgeup"] + expandEdges ) axisACoord_edgelow = ( orderPixelTable.loc[(orderPixelTable["order"] == o)][f"{self.axisA}coord_edgelow"] - expandEdges ) axisACoord_edgeup[axisACoord_edgeup > axisALen] = axisALen axisACoord_edgeup[axisACoord_edgeup < 0] = 0 axisACoord_edgelow[axisACoord_edgelow > axisALen] = axisALen axisACoord_edgelow[axisACoord_edgelow < 0] = 0 axisACoord_edgelow, axisACoord_edgeup, axisBcoord = zip( *[ (l, u, b) for l, u, b in zip(axisACoord_edgelow, axisACoord_edgeup, axisBcoord) if l >= 0 and l <= axisALen and u >= 0 and u <= axisALen and b >= 0 and b < axisBLen ] ) if reverse: for b, u, l in zip( axisBcoord, np.ceil(axisACoord_edgeup).astype(int), np.floor(axisACoord_edgelow).astype(int) ): if self.axisA == "x": wlMap.data[b, l:u] = 0 orderMap.data[b, l:u] = o else: wlMap.data[l:u, b] = 0 orderMap.data[l:u, b] = o else: for b, u, l in zip( axisBcoord, np.ceil(axisACoord_edgeup).astype(int), np.floor(axisACoord_edgelow).astype(int) ): if self.axisA == "x": wlMap.data[b, l:u] = np.nan orderMap.data[b, l:u] = np.nan else: wlMap.data[l:u, b] = np.nan orderMap.data[l:u, b] = np.nan # SLIT MAP PLACEHOLDER SAME AS WAVELENGTH MAP PLACEHOLDER slitMap = wlMap.copy() # PLOT CCDDATA OBJECT if self.debug and False: import matplotlib.pyplot as plt rotatedImg = np.rot90(slitMap.data, 1) rotatedImg = slitMap.data std = np.nanstd(slitMap.data) mean = np.nanmean(slitMap.data) vmax = mean + 3 * std vmin = mean - 3 * std plt.figure(figsize=(12, 5)) plt.imshow(rotatedImg, vmin=vmin, vmax=vmax, cmap="gray", alpha=1) plt.colorbar() plt.xlabel("y-axis", fontsize=10) plt.ylabel("x-axis", fontsize=10) plt.show() plt.close("all") self.log.debug("completed the ``create_placeholder_images`` method") return slitMap, wlMap, orderMap
[docs] def map_to_image(self, dispersionMapPath, orders=False): """*convert the dispersion map to images in the detector format showing pixel wavelength values and slit positions* **Key Arguments:** - ``dispersionMapPath`` -- path to the full dispersion map to convert to images - ``orders`` -- orders to add to the image map. List. Default *False* (add all orders) **Return:** - ``dispersion_image_filePath`` -- path to the FITS image with an extension for wavelength values and another for slit positions **Usage:** ```python mapImagePath = self.map_to_image(dispersionMapPath=mapPath) ``` """ self.log.debug("starting the ``map_to_image`` method") from soxspipe.commonutils.combiner import Combiner import numpy as np from astropy.io import fits import copy from fundamentals import fmultiprocess from soxspipe.commonutils import toolkit self.log.print("\n# CREATING 2D IMAGE MAP FROM DISPERSION SOLUTION\n\n") self.dispersionMapPath = dispersionMapPath kw = self.kw dp = self.detectorParams arm = self.arm self.map_to_image_displacement_threshold = self.recipeSettings["map_to_image_displacement_threshold"] # READ THE SPECTRAL FORMAT TABLE TO DETERMINE THE LIMITS OF THE TRACES orderNums, waveLengthMin, waveLengthMax = read_spectral_format( log=self.log, settings=self.settings, arm=self.arm ) combinedSlitImage = False combinedWlImage = False if orders: theseOrders = orders else: theseOrders = orderNums # DEFINE AN INPUT ARRAY inputArray = [ (order, minWl, maxWl) for order, minWl, maxWl in zip(orderNums, waveLengthMin, waveLengthMax) if order in theseOrders ] if self.debug or self.turnOffMP: turnOffMP = True else: turnOffMP = False if turnOffMP: results = fmultiprocess( log=self.log, function=self.order_to_image, inputArray=inputArray, poolSize=6, timeout=3600, turnOffMP=turnOffMP, ) else: # NUMPY CAN BE TRICKY WITH MP numThreads = "1" os.environ["OPENBLAS_NUM_THREADS"] = numThreads os.environ["OMP_NUM_THREADS"] = numThreads os.environ["BLAS_NUM_THREADS"] = numThreads results = fmultiprocess( log=self.log, function=self.order_to_image, inputArray=inputArray, poolSize=6, timeout=3600, turnOffMP=turnOffMP, ) del os.environ["OPENBLAS_NUM_THREADS"] del os.environ["OMP_NUM_THREADS"] del os.environ["BLAS_NUM_THREADS"] slitImages = [r[0] for r in results] wlImages = [r[1] for r in results] slitMap, wlMap, orderMap = self.create_placeholder_images(reverse=True) combinedSlitImage = Combiner(slitImages, dtype=np.float32) combinedSlitImage = combinedSlitImage.sum_combine() combinedWlImage = Combiner(wlImages, dtype=np.float32) combinedWlImage = combinedWlImage.sum_combine() combinedWlImage.data += wlMap.data combinedSlitImage.data += wlMap.data toolkit.frame_to_32(combinedWlImage) toolkit.frame_to_32(combinedSlitImage) toolkit.frame_to_32(orderMap) # GET THE EXTENSION (WITH DOT PREFIX) extension = os.path.splitext(dispersionMapPath)[1] filename = os.path.basename(dispersionMapPath).replace(extension, "_IMAGE.fits") dispersion_image_filePath = f"{self.productDir}/{filename}" # WRITE CCDDATA OBJECT TO FILE # from astropy.io import fits header = copy.deepcopy(self.dispMapHeader) header[kw("PRO_CATG")] = f"DISP_IMAGE_{arm}".upper() primary_hdu = fits.PrimaryHDU(combinedWlImage.data, header=header) primary_hdu.header["EXTNAME"] = "WAVELENGTH" image_hdu = fits.ImageHDU(combinedSlitImage.data) image_hdu.header["EXTNAME"] = "SLIT" image_hdu2 = fits.ImageHDU(orderMap.data) image_hdu2.header["EXTNAME"] = "ORDER" hdul = fits.HDUList([primary_hdu, image_hdu, image_hdu2]) hdul.verify("fix") hdul.writeto(dispersion_image_filePath, output_verify="exception", overwrite=True, checksum=True) self.log.debug("completed the ``map_to_image`` method") return dispersion_image_filePath
[docs] def order_to_image(self, orderInfo): """*convert a single order in the dispersion map to wavelength and slit position images* **Key Arguments:** - ``orderInfo`` -- tuple containing the order number to generate the images for, the minimum wavelength to consider (from format table) and maximum wavelength to consider (from format table). **Return:** - ``slitMap`` -- the slit map with order values filled - ``wlMap`` -- the wavelengths map with order values filled **Usage:** ```python slitMap, wlMap = self.order_to_image(order=order,minWl=minWl, maxWl=maxWl) ``` """ self.log.debug("starting the ``order_to_image`` method") import numpy as np from scipy.interpolate import griddata order, minWl, maxWl = orderInfo slitMap, wlMap, orderMap = self.create_placeholder_images(order=order) if np.count_nonzero(wlMap.data) == 0: return slitMap, wlMap # FIRST GENERATE A WAVELENGTH SURFACE - FINE WL, CHUNKY SLIT-POSITION wlRange = maxWl - minWl grid_res_wavelength = wlRange / 200 slitLength = self.detectorParams["slit_length"] grid_res_slit = slitLength / 20 halfGrid = (slitLength / 2) * 1.2 slitArray = np.arange(-halfGrid, halfGrid + grid_res_slit, grid_res_slit) wlArray = np.arange(minWl, maxWl, grid_res_wavelength) # ONE SINGLE-VALUE SLIT ARRAY FOR EVERY WAVELENGTH ARRAY bigSlitArray = np.repeat(slitArray, wlArray.shape[0]) # NOW THE BIG WAVELENGTH ARRAY bigWlArray = np.tile(wlArray, slitArray.shape[0]) iteration = 0 remainingPixels = 1 iterationLimit = 20 remainingCount = 1 # GET A COMPLETE LIST OF THE PIXEL WE NEED nan_indexes = np.argwhere(np.isnan(slitMap.data)) ally, allx = nan_indexes[:, 0], nan_indexes[:, 1] while remainingPixels and remainingCount and iteration < iterationLimit: iteration += 1 # GENERATE THE ORDER PIXEL TABLE FROM WL AND SLIT-POSITION GRID .. IF WITHIN THRESHOLD OF CENTRE OF DETECTOR PIXEL THEN INJECT INTO MAPS orderPixelTable, remainingCount = self.convert_and_fit( order=order, bigWlArray=bigWlArray, bigSlitArray=bigSlitArray, slitMap=slitMap, wlMap=wlMap, iteration=iteration, plots=False, ) if remainingCount < 3: break # PRINT COLUMN NAMES AND TYPES FOR DEBUGGING orderPixelTable = orderPixelTable.drop_duplicates(subset=["pixel_x", "pixel_y", "order"]) train_wlx = orderPixelTable["fit_x"].values train_wly = orderPixelTable["fit_y"].values train_wl = orderPixelTable["wavelength"].values train_sp = orderPixelTable["slit_position"].values targetX = orderPixelTable["pixel_x"].values targetY = orderPixelTable["pixel_y"].values if iteration == 1: # ADD MISSING PIXELS targetX = np.concatenate([targetX, allx]) targetY = np.concatenate([targetY, ally]) # USE LINEAR INTERPOLATION TO SEED RESULTS (FASTER THAN CUBIC; SUFFICIENT FOR SEEDING) bigWlArray = griddata((train_wlx, train_wly), train_wl, (targetX, targetY), method="cubic") bigSlitArray = griddata((train_wlx, train_wly), train_sp, (targetX, targetY), method="cubic") self.log.debug("completed the ``order_to_image`` method") return slitMap, wlMap
[docs] def convert_and_fit(self, order, bigWlArray, bigSlitArray, slitMap, wlMap, iteration, plots=False): """*convert wavelength and slit position grids to pixels* **Key Arguments:** - ``order`` -- the order being considered - ``bigWlArray`` -- 1D array of all wavelengths to be converted - ``bigSlitArray`` -- 1D array of all split-positions to be converted (same length as `bigWlArray`) - ``slitMap`` -- place-holder image hosting fitted pixel slit-position values - ``wlMap`` -- place-holder image hosting fitted pixel wavelength values - ``iteration`` -- the iteration index (used for CL reporting) - ``plots`` -- show plot of the slit-map **Return:** - ``orderPixelTable`` -- dataframe containing unfitted pixel info - ``remainingCount`` -- number of remaining pixels in orderTable **Usage:** ```python orderPixelTable = self.convert_and_fit( order=order, bigWlArray=bigWlArray, bigSlitArray=bigSlitArray, slitMap=slitMap, wlMap=wlMap) ``` """ self.log.debug("starting the ``convert_and_fit`` method") import pandas as pd import numpy as np # CREATE PANDAS DATAFRAME WITH LARGE ARRAYS - ONE ROW PER # WAVELENGTH-SLIT GRID CELL myDict = { "order": np.ones(bigWlArray.shape[0]) * order, "wavelength": bigWlArray, "slit_position": bigSlitArray, } orderPixelTable = pd.DataFrame(myDict) # GET DETECTOR PIXEL POSITIONS FOR ALL WAVELENGTH-SLIT GRID CELLS orderPixelTable = dispersion_map_to_pixel_arrays( log=self.log, dispersionMapPath=self.dispersionMapPath, orderPixelTable=orderPixelTable, trimColumns=True ) # INTEGER PIXEL VALUES & FIT DISPLACEMENTS FROM PIXEL CENTRES orderPixelTable["pixel_x"] = np.round(orderPixelTable["fit_x"].values) orderPixelTable["pixel_y"] = np.round(orderPixelTable["fit_y"].values) orderPixelTable["residual_x"] = orderPixelTable["fit_x"] - orderPixelTable["pixel_x"] orderPixelTable["residual_y"] = orderPixelTable["fit_y"] - orderPixelTable["pixel_y"] orderPixelTable["residual_xy"] = np.sqrt( np.square(orderPixelTable["residual_x"]) + np.square(orderPixelTable["residual_y"]) ) # DEFINE COLUMN TYPES orderPixelTable["order"] = orderPixelTable["order"].astype(np.int16) orderPixelTable["wavelength"] = orderPixelTable["wavelength"].astype(np.float32) orderPixelTable["slit_position"] = orderPixelTable["slit_position"].astype(np.float32) orderPixelTable["pixel_x"] = orderPixelTable["pixel_x"].astype(np.int16) orderPixelTable["pixel_y"] = orderPixelTable["pixel_y"].astype(np.int16) # ADD A COUNT COLUMN FOR THE NUMBER OF SMALL SLIT/WL PIXELS FALLING IN # LARGE DETECTOR PIXELS count = orderPixelTable.groupby(["pixel_x", "pixel_y"]).size().reset_index(name="count") count["count"] = count["count"].astype(np.int16) orderPixelTable = pd.merge( orderPixelTable, count, how="left", left_on=["pixel_x", "pixel_y"], right_on=["pixel_x", "pixel_y"] ) orderPixelTable = orderPixelTable.sort_values(["order", "pixel_x", "pixel_y", "residual_xy"]) # FILTER TO WL/SLIT POSITION CLOSE ENOUGH TO CENTRE OF PIXEL mask = orderPixelTable["residual_xy"] < self.map_to_image_displacement_threshold # KEEP ONLY VALUES CLOSEST TO CENTRE OF PIXEL newPixelValue = orderPixelTable.loc[mask].drop_duplicates(subset=["pixel_x", "pixel_y"], keep="first") # REMOVE PIXELS FOUND IN newPixelValue FROM orderPixelTable orderPixelTable = ( newPixelValue[["pixel_x", "pixel_y"]] .merge(orderPixelTable, on=["pixel_x", "pixel_y"], how="right", indicator=True) .query('_merge == "right_only"') .drop(columns=["_merge"]) ) remainingCount = orderPixelTable.drop_duplicates(subset=["pixel_x", "pixel_y"], keep="first") # ADD FITTED PIXELS TO PLACE HOLDER IMAGES for xx, yy, wavelength, slit_position in zip( newPixelValue["pixel_x"].values.astype(int), newPixelValue["pixel_y"].values.astype(int), newPixelValue["wavelength"].values, newPixelValue["slit_position"].values, ): try: wlMap.data[yy, xx] = np.where(np.isnan(wlMap.data[yy, xx]), wavelength, wlMap.data[yy, xx]) slitMap.data[yy, xx] = np.where(np.isnan(slitMap.data[yy, xx]), slit_position, slitMap.data[yy, xx]) except IndexError: # PIXELS OUTSIDE OF DETECTOR EDGES - IGNORE pass sys.stdout.flush() sys.stdout.write("\x1b[1A\x1b[2K") percentageFound = (1 - (np.count_nonzero(np.isnan(wlMap.data)) / np.count_nonzero(wlMap.data))) * 100 try: self.log.print( f"ORDER {order:02d}, iteration {iteration:02d}. {percentageFound:0.2f}% order pixels now fitted." ) except: pass if plots: from matplotlib import cm import matplotlib.pyplot as plt # PLOT CCDDATA OBJECT rotatedImg = slitMap.data if self.axisA == "x": rotatedImg = np.rot90(rotatedImg, 1) rotatedImg = np.flipud(rotatedImg) std = np.nanstd(rotatedImg) mean = np.nanmean(rotatedImg) cmap = cm.gray cmap.set_bad(color="#ADD8E6") vmax = np.nanmax(rotatedImg) vmin = np.nanmin(rotatedImg) plt.figure(figsize=(24, 10)) plt.imshow(rotatedImg, vmin=vmin, vmax=vmax, cmap=cmap, alpha=1) if self.axisA == "x": plt.gca().invert_yaxis() plt.colorbar() plt.ylabel(f"{self.axisA}-axis", fontsize=12) plt.xlabel(f"{self.axisB}-axis", fontsize=12) plt.show() plt.close("all") remainingCount = len(remainingCount.index) self.log.debug("completed the ``convert_and_fit`` method") return orderPixelTable, remainingCount
def _create_dispersion_map_qc_plot( self, xcoeff, ycoeff, orderDeg, wavelengthDeg, slitDeg, orderPixelTable, missingLines, allClippedLines, dispMap=False, dispMapImage=False, ): """*create the QC plot for the dispersion map solution* **Key Arguments:** - ``xcoeff`` -- the x-coefficients - ``ycoeff`` -- the y-coefficients - ``orderDeg`` -- degree of the order fitting - ``wavelengthDeg`` -- degree of wavelength fitting - ``slitDeg`` -- degree of the slit fitting (False for single pinhole) - ``orderPixelTable`` -- a panda's data-frame containing wavelength,order,slit_index,slit_position,detector_x,detector_y - ``missingLines`` -- lines not detected on the image - `allClippedLines` -- lines clipped during dispersion solution fitting - `dispMap` -- path to dispersion map. Default *False* - `dispMapImage` -- the 2D dispersion map image **Return:** - ``res_plots`` -- path the the output QC plot ``` """ self.log.debug("starting the ``create_dispersion_map_qc_plot`` method") import numpy as np from astropy.visualization import hist import matplotlib.pyplot as plt import pandas as pd from soxspipe.commonutils.toolkit import qc_settings_plot_tables from astropy.stats import sigma_clipped_stats arm = self.arm kw = self.kw dp = self.detectorParams rotateImage = dp["rotate-qc-plot"] flipImage = dp["flip-qc-plot"] gridLinePixelTable = False # CREATE THE GRID LINES FOR THE FULL SPATIAL SOLUTION PLOTS (MPH) if not isinstance(dispMapImage, bool): from soxspipe.commonutils.toolkit import create_dispersion_solution_grid_lines_for_plot gridLinePixelTable, interOrderMask = create_dispersion_solution_grid_lines_for_plot( log=self.log, dispMap=dispMap, dispMapImage=dispMapImage, associatedFrame=self.pinholeFrame, kw=kw, skylines=False, slitPositions=self.uniqueSlitPos, ) # DROP MISSING VALUES orderPixelTable.dropna(axis="index", how="any", subset=["residuals_x"], inplace=True) orderPixelTable["residuals_xy"] = np.sqrt( np.square(orderPixelTable["residuals_x"]) + np.square(orderPixelTable["residuals_y"]) ) mean_res = np.mean(orderPixelTable["residuals_xy"]) std_res = np.std(orderPixelTable["residuals_xy"]) median_res = np.median(orderPixelTable["residuals_xy"]) # mean_x_res = abs(orderPixelTable["residuals_x"]).mean() # mean_y_res = abs(orderPixelTable["residuals_y"]).mean() mean_x_res = abs(orderPixelTable["residuals_x"]).mean() mean_y_res = abs(orderPixelTable["residuals_y"]).mean() median_x_res = np.median(abs(orderPixelTable["residuals_x"])) median_y_res = np.median(abs(orderPixelTable["residuals_y"])) # ROTATE THE IMAGE FOR BETTER LAYOUT rotatedImg = np.ma.array(self.pinholeFrameMasked.data, mask=self.pinholeFrameMasked.mask, fill_value=0).filled() if rotateImage: rotatedImg = np.rot90(rotatedImg, rotateImage / 90) if flipImage: rotatedImg = np.flipud(rotatedImg) if not rotateImage: aLen = rotatedImg.shape[0] - 1 # OBSERVED VALUES orderPixelTable[f"observed_{self.axisA}"] = aLen - orderPixelTable[f"observed_{self.axisA}"] allClippedLines[f"observed_{self.axisA}"] = aLen - allClippedLines[f"observed_{self.axisA}"] # DETECTOR SHIFTED VALUES missingLines[f"detector_{self.axisA}_shifted"] = aLen - missingLines[f"detector_{self.axisA}_shifted"] allClippedLines[f"detector_{self.axisA}_shifted"] = ( aLen - allClippedLines[f"detector_{self.axisA}_shifted"] ) orderPixelTable[f"detector_{self.axisA}_shifted"] = ( aLen - orderPixelTable[f"detector_{self.axisA}_shifted"] ) # ORIGINAL DETECTOR VALUES allClippedLines[f"detector_{self.axisA}"] = aLen - allClippedLines[f"detector_{self.axisA}"] missingLines[f"detector_{self.axisA}"] = aLen - missingLines[f"detector_{self.axisA}"] orderPixelTable[f"detector_{self.axisA}"] = aLen - orderPixelTable[f"detector_{self.axisA}"] # FITTED VALUES orderPixelTable[f"fit_{self.axisA}"] = aLen - orderPixelTable[f"fit_{self.axisA}"] if not isinstance(gridLinePixelTable, bool): gridLinePixelTable[f"fit_{self.axisA}"] = aLen - gridLinePixelTable[f"fit_{self.axisA}"] # orderPixelTable[f"observed_{self.axisA}"] = aLen - orderPixelTable[f"observed_{self.axisA}"] # orderPixelTable[f"observed_{self.axisA}"] = aLen - orderPixelTable[f"observed_{self.axisA}"] # a = plt.figure(figsize=(40, 15)) if rotatedImg.shape[0] / rotatedImg.shape[1] > 0.8: # SOXS NIR fig = plt.figure(figsize=(6, 20), constrained_layout=True) # CREATE THE GRID OF AXES if self.arcFrame: gs = fig.add_gridspec(12, 4) sizeAx = fig.add_subplot(gs[8:9, :]) gapAx = fig.add_subplot(gs[9:10, :]) settingsAx = fig.add_subplot(gs[10:, 2:]) qcAx = fig.add_subplot(gs[11:, 0:2]) else: gs = fig.add_gridspec(10, 4) settingsAx = fig.add_subplot(gs[8:, 2:]) qcAx = fig.add_subplot(gs[8:, 0:2]) toprow = fig.add_subplot(gs[0:2, :]) midrow = fig.add_subplot(gs[2:4, :]) bottomleft = fig.add_subplot(gs[4:6, 0:2]) bottomright = fig.add_subplot(gs[4:6, 2:]) fwhmAx = fig.add_subplot(gs[6:7, :]) resAx = fig.add_subplot(gs[7:8, :]) else: # SOXS VIS if self.firstGuessMap: fig = plt.figure(figsize=(6, 18), constrained_layout=True) else: fig = plt.figure(figsize=(6, 14), constrained_layout=True) # CREATE THE GRID OF AXES # CREATE THE GRID OF AXES if self.arcFrame: gs = fig.add_gridspec(12, 4) sizeAx = fig.add_subplot(gs[8:9, :]) gapAx = fig.add_subplot(gs[9:10, :]) settingsAx = fig.add_subplot(gs[10:, 2:]) qcAx = fig.add_subplot(gs[11:, 0:2]) else: gs = fig.add_gridspec(10, 4) settingsAx = fig.add_subplot(gs[8:, 2:]) qcAx = fig.add_subplot(gs[8:, 0:2]) toprow = fig.add_subplot(gs[0:2, :]) midrow = fig.add_subplot(gs[2:4, :]) bottomleft = fig.add_subplot(gs[4:6, 0:2]) bottomright = fig.add_subplot(gs[4:6, 2:]) fwhmAx = fig.add_subplot(gs[6:7, :]) resAx = fig.add_subplot(gs[7:8, :]) # TOP ROW - IMAGE WITH LINE POSITIONS OVERLAID vmax = self.meanFrameFlux + 25 * self.stdFrameFlux vmin = self.meanFrameFlux toprow.imshow(rotatedImg, vmin=vmin, vmax=vmax, cmap="gray", alpha=0.5) toprow.set_title("observed arc-line positions (post-clipping)", fontsize=10) alphaBoost = 1.0 if not self.firstGuessMap or self.debug: alphaBoost = 1.7 if self.debug: # PLOT WHERE LINES GET SHIFTED TO toprow.scatter( missingLines[f"detector_{self.axisB}"], missingLines[f"detector_{self.axisA}"], marker="o", c="black", s=10, alpha=0.2 * alphaBoost, linewidths=0.5, label="missed lines - original location", ) toprow.scatter( allClippedLines[f"detector_{self.axisB}"], allClippedLines[f"detector_{self.axisA}"], marker="x", c="black", s=10, alpha=0.2 * alphaBoost, linewidths=0.5, label="clipped lines - original location", ) toprow.scatter( orderPixelTable[f"detector_{self.axisB}"], orderPixelTable[f"detector_{self.axisA}"], marker="o", c="black", s=10, alpha=0.2 * alphaBoost, linewidths=0.5, label="detected lines - original location", ) # PLOT WHERE LINES GET SHIFTED TO toprow.scatter( missingLines[f"detector_{self.axisB}_shifted"], missingLines[f"detector_{self.axisA}_shifted"], marker="o", c="red", s=10, alpha=0.2 * alphaBoost, linewidths=0.5, label="missed lines - shifted location", ) toprow.scatter( allClippedLines[f"detector_{self.axisB}_shifted"], allClippedLines[f"detector_{self.axisA}_shifted"], marker="x", c="red", s=10, alpha=0.2 * alphaBoost, linewidths=0.5, label="clipped lines - shifted location", ) toprow.scatter( orderPixelTable[f"detector_{self.axisB}_shifted"], orderPixelTable[f"detector_{self.axisA}_shifted"], marker="o", c="red", s=10, alpha=0.2 * alphaBoost, linewidths=0.5, label="detected lines - shifted location", ) # PLOT WHERE LINES GET SHIFTED TO toprow.scatter( allClippedLines[f"observed_{self.axisB}"], allClippedLines[f"observed_{self.axisA}"], marker="x", c="blue", s=10, alpha=0.2 * alphaBoost, linewidths=0.5, label="clipped lines - observed location", ) toprow.scatter( orderPixelTable[f"observed_{self.axisB}"], orderPixelTable[f"observed_{self.axisA}"], marker="o", c="blue", s=10, alpha=0.2 * alphaBoost, linewidths=0.5, label="detected lines - observed location", ) else: if isinstance(missingLines, pd.core.frame.DataFrame): toprow.scatter( missingLines[f"detector_{self.axisB}_shifted"], missingLines[f"detector_{self.axisA}_shifted"], marker="o", c="black", s=7, alpha=0.1 * alphaBoost, linewidths=0.5, label="undetected line location", ) # toprow.scatter(orderPixelTable[f"detector_{self.axisB}_shifted"], orderPixelTable[f"detector_{self.axisA}_shifted"], marker='o', c='black', s=20, alpha=0.4 * alphaBoost, linewidths=0.5, label="undetected line location") # toprow.scatter(allClippedLines[f"detector_{self.axisB}_shifted"], allClippedLines[f"detector_{self.axisA}_shifted"], marker='o', c='black', s=20, alpha=0.4 * alphaBoost, linewidths=0.5, label="undetected line location") if len(allClippedLines.index): pass mask = allClippedLines["dropped"] == True if self.firstGuessMap: toprow.scatter( allClippedLines.loc[mask][f"observed_{self.axisB}"], allClippedLines.loc[mask][f"observed_{self.axisA}"], marker="o", c="blue", s=5, alpha=0.3 * alphaBoost, linewidths=0.5, label="dropped multi-pinhole set", ) else: toprow.scatter( allClippedLines.loc[mask][f"observed_{self.axisB}"], allClippedLines.loc[mask][f"observed_{self.axisA}"], marker="o", c="blue", s=5, alpha=0.3 * alphaBoost, linewidths=0.5, label="dropped pinhole", ) toprow.scatter( allClippedLines.loc[~mask][f"observed_{self.axisB}"], allClippedLines.loc[~mask][f"observed_{self.axisA}"], marker="o", c="green", s=5, alpha=0.3 * alphaBoost, linewidths=0.5 * alphaBoost, ) toprow.scatter( allClippedLines.loc[~mask][f"observed_{self.axisB}"], allClippedLines.loc[~mask][f"observed_{self.axisA}"], marker="x", c="red", s=5, alpha=0.3 * alphaBoost, linewidths=0.5, label="clipped during dispersion solution fitting", ) if len(orderPixelTable.index): toprow.scatter( orderPixelTable[f"observed_{self.axisB}"], orderPixelTable[f"observed_{self.axisA}"], marker="o", c="green", s=5, alpha=0.3 * alphaBoost, linewidths=0.5, label="detected line location", ) # toprow.scatter(orderPixelTable[f"tilt_corrected_{self.axisB}"], orderPixelTable[f"tilt_corrected_{self.axisA}"], marker='o', c='purple', s=8, alpha=0.3 * alphaBoost, linewidths=0.5 * alphaBoost, ) toprow.set_ylabel(f"{self.axisA}-axis", fontsize=12) toprow.set_xlabel(f"{self.axisB}-axis", fontsize=12) toprow.tick_params(axis="both", which="major", labelsize=9) if self.arm == "VIS": toprow.legend(loc="upper right", bbox_to_anchor=(1.0, -0.2), fontsize=4) else: toprow.legend(loc="upper right", bbox_to_anchor=(1.0, -0.15), fontsize=4) toprow.set_xlim([0, rotatedImg.shape[1]]) if self.axisA == "x": toprow.invert_yaxis() toprow.set_ylim([0, rotatedImg.shape[0]]) midrow.imshow(rotatedImg, vmin=vmin, vmax=vmax, cmap="gray", alpha=0.5) midrow.set_title("global dispersion solution", fontsize=10) # ADD FULL DISPERSION SOLUTION GRID-LINES TO PLOT if not isinstance(gridLinePixelTable, bool): for l in range(int(gridLinePixelTable["line"].max())): mask = gridLinePixelTable["line"] == l if l == 1: midrow.plot( gridLinePixelTable.loc[mask][f"fit_{self.axisB}"], gridLinePixelTable.loc[mask][f"fit_{self.axisA}"], "w-", linewidth=0.2, alpha=0.5 * alphaBoost, color="blue", label="dispersion solution", ) toprow.plot( gridLinePixelTable.loc[mask][f"fit_{self.axisB}"], gridLinePixelTable.loc[mask][f"fit_{self.axisA}"], "w-", linewidth=0.2, alpha=0.5 * alphaBoost, color="blue", label="dispersion solution", ) else: midrow.plot( gridLinePixelTable.loc[mask][f"fit_{self.axisB}"], gridLinePixelTable.loc[mask][f"fit_{self.axisA}"], "w-", linewidth=0.2, alpha=0.5 * alphaBoost, color="blue", ) toprow.plot( gridLinePixelTable.loc[mask][f"fit_{self.axisB}"], gridLinePixelTable.loc[mask][f"fit_{self.axisA}"], "w-", linewidth=0.2, alpha=0.5 * alphaBoost, color="blue", ) else: midrow.scatter( orderPixelTable[f"fit_{self.axisB}"], orderPixelTable[f"fit_{self.axisA}"], marker="o", c="blue", s=orderPixelTable[f"residuals_xy"] * 30, alpha=0.1 * alphaBoost, label="fitted line (size proportional to line-fit residual)", ) midrow.set_ylabel(f"{self.axisA}-axis", fontsize=12) midrow.set_xlabel(f"{self.axisB}-axis", fontsize=12) midrow.tick_params(axis="both", which="major", labelsize=9) midrow.set_xlim([0, rotatedImg.shape[1]]) if self.axisA == "x": midrow.invert_yaxis() midrow.set_ylim([0, rotatedImg.shape[0]]) if self.arm == "VIS": midrow.legend(loc="upper right", bbox_to_anchor=(1.0, -0.2), fontsize=4) else: midrow.legend(loc="upper right", bbox_to_anchor=(1.0, -0.15), fontsize=4) # PLOT THE RESIDUALS plt.subplots_adjust(top=0.92) bottomleft.scatter( orderPixelTable[f"residuals_{self.axisA}"], orderPixelTable[f"residuals_{self.axisB}"], alpha=0.1 ) bottomleft.set_xlabel(f"{self.axisA} residual (mean: {mean_x_res:2.2f} pix)") bottomleft.set_ylabel(f"{self.axisB} residual (mean: {mean_y_res:2.2f} pix)") bottomleft.tick_params(axis="both", which="major", labelsize=9) hist( orderPixelTable["residuals_xy"], bins="scott", ax=bottomright, histtype="stepfilled", alpha=0.7, density=True, ) bottomright.set_xlabel("xy residual") bottomright.tick_params(axis="both", which="major", labelsize=9) subtitle = f"mean res: {mean_res:2.2f} pix, res stdev: {std_res:2.2f}" if self.firstGuessMap: fig.suptitle( f"residuals of global dispersion solution fitting - {arm} multi-pinhole\n{subtitle}", fontsize=10, y=0.99, ) else: fig.suptitle( f"residuals of global dispersion solution fitting - {arm} single pinhole\n{subtitle}", fontsize=10, y=0.99, ) orderPixelTable_groups = orderPixelTable.groupby(["order"]) if self.arcFrame: mean_fwhm, median, std_fwhm = sigma_clipped_stats( orderPixelTable["fwhm_slit_px"].values, sigma=3.0, stdfunc="std", cenfunc="mean", maxiters=3 ) mean_r, median, std_r = sigma_clipped_stats( orderPixelTable["R_slit"].values, sigma=3.0, stdfunc="std", cenfunc="mean", maxiters=3 ) else: mean_fwhm, median, std_fwhm = sigma_clipped_stats( orderPixelTable["fwhm_pin_px"].values, sigma=3.0, stdfunc="std", cenfunc="mean", maxiters=3 ) mean_r, median, std_r = sigma_clipped_stats( orderPixelTable["R_pin"].values, sigma=3.0, stdfunc="std", cenfunc="mean", maxiters=3 ) lowerFwhm = mean_fwhm - 3 * std_fwhm upperFwhm = mean_fwhm + 3 * std_fwhm lowerR = mean_r - 3 * std_r upperR = mean_r + 3 * std_r if self.arcFrame: # UNPACK ORDER TABLE TO GET SLIT HEIGHT IN PX orderPolyTable, orderGeoTable, orderMetaTable = unpack_order_table( log=self.log, orderTablePath=self.orderTable, extend=0.0 ) orderGeoTable[f"{self.axisA}_u"] = orderGeoTable[f"{self.axisA}coord_edgeup"].astype(int) orderGeoTable[f"{self.axisA}_l"] = orderGeoTable[f"{self.axisA}coord_edgelow"].astype(int) dispMapDF, interOrderMask = twoD_disp_map_image_to_dataframe( log=self.log, slit_length=14, twoDMapPath=dispMapImage, associatedFrame=self.arcFrame, kw=self.kw ) # MERGE DATAFRAMES orderGeoTableLower = orderGeoTable.merge( dispMapDF, left_on=[f"{self.axisA}_l", f"{self.axisB}coord"], right_on=[self.axisA, self.axisB] ) orderGeoTableLower.rename(columns={"order_x": "order"}, inplace=True) orderGeoTableLower = orderGeoTableLower[ [ "order", "x", "y", "wavelength", "slit_position", "flux", "pixelScale", f"{self.axisA}coord_centre", f"{self.axisA}coord_edgelow", ] ] orderGeoTableUpper = orderGeoTable.merge( dispMapDF, left_on=[f"{self.axisA}_u", f"{self.axisB}coord"], right_on=[self.axisA, self.axisB] ) orderGeoTableUpper.rename(columns={"order_x": "order"}, inplace=True) orderGeoTableUpper = orderGeoTableUpper[ [ "order", "x", "y", "wavelength", "slit_position", "flux", "pixelScale", f"{self.axisA}coord_centre", f"{self.axisA}coord_edgeup", ] ] orderGeoTable = orderGeoTableLower.merge( orderGeoTableUpper, left_on=[f"{self.axisA}coord_centre", self.axisB], right_on=[f"{self.axisA}coord_centre", self.axisB], suffixes=("_l", "_u"), ) orderGeoTable.rename(columns={"order_l": "order"}, inplace=True) orderGeoTable["wavelength"] = (orderGeoTable["wavelength_l"] + orderGeoTable["wavelength_u"]) / 2.0 orderGeoTable["slitLengthPixelsInt"] = np.abs( orderGeoTable[f"{self.axisA}_u"] - orderGeoTable[f"{self.axisA}_l"] ) orderGeoTable["slitLengthPixels"] = np.abs( orderGeoTable[f"{self.axisA}coord_edgeup"] - orderGeoTable[f"{self.axisA}coord_edgelow"] ) orderGeoTable["slitLengthArcsec"] = np.abs( orderGeoTable[f"slit_position_u"] - orderGeoTable[f"slit_position_l"] ) orderGeoTable["pixelScale"] = orderGeoTable["slitLengthArcsec"] / orderGeoTable["slitLengthPixelsInt"] orderGeoTable["slitLengthArcsec"] = orderGeoTable["slitLengthPixels"] * orderGeoTable["pixelScale"] for name, group in orderPixelTable_groups: thisOrder = group["order"].values[0] if self.arcFrame: slitWidth = self.arcFrame.header[kw(f"SLIT_{arm}")].replace("SLIT", "").split("x")[0] fwhmAx.set_title(f'Line FWHM measured via {slitWidth}" slit arc-lamp frame', fontsize=9) fwhmAx.scatter(group["wavelength"], group["fwhm_slit_px"], alpha=0.1) mean_fwhm, median, std_fwhm = sigma_clipped_stats( group["fwhm_slit_px"].values, sigma=3.0, stdfunc="std", cenfunc="mean", maxiters=3 ) fwhmAx.set_ylabel("FWHM (px)", fontsize=9) mean_wavelength = group["wavelength"].mean() fwhmAx.errorbar(mean_wavelength, mean_fwhm, yerr=std_fwhm, fmt="o", color="black", alpha=0.6) fwhmAx.tick_params(axis="both", which="major", labelsize=8) x_limits = fwhmAx.get_xlim() resAx.set_title(f'Resolution measured via {slitWidth}" slit arc-lamp frame', fontsize=9) resAx.scatter(group["wavelength"], group["R_slit"], alpha=0.1) mean_resol, median, std_resol = sigma_clipped_stats( group["R_slit"].values, sigma=3.0, stdfunc="std", cenfunc="mean", maxiters=3 ) # ADD TO THE POINT THE ERROR BAR CONTAINED IN STD_RED resAx.errorbar(mean_wavelength, mean_resol, yerr=std_resol, fmt="o", color="black", alpha=0.6) if thisOrder not in [24]: # FILTER DATA FRAME # FIRST CREATE THE MASK mask = orderGeoTable["order"] == thisOrder filteredDf = orderGeoTable.loc[mask] sizeAx.plot(filteredDf["wavelength"], filteredDf["slitLengthArcsec"]) sizeAx.set_xlabel("wavelength (nm)", fontsize=9) sizeAx.set_ylabel("slit height\n(arcsec)", fontsize=9) sizeAx.set_xlim(x_limits) sizeAx.tick_params(axis="both", which="major", labelsize=8) sizeAx.set_title(f"Slit height as measured between the lower and upper order edges", fontsize=9) # PLOTTING THE ORDER GAP DISTANCES if thisOrder not in [23]: mask = orderGeoTable["order"] == thisOrder order_l = orderGeoTable[mask] mask = orderGeoTable["order"] == thisOrder + 1 order_u = orderGeoTable[mask] if len(order_l.index) and len(order_u.index): merged_ul = order_l.merge(order_u, on=[self.axisB], suffixes=("_l", "_u")) merged_ul["order_distance"] = ( merged_ul[f"{self.axisA}coord_edgelow_u"] - merged_ul[f"{self.axisA}coord_edgeup_l"] ) pixel_scale_mean = (merged_ul["pixelScale_l"] + merged_ul["pixelScale_u"]) / (2.0) gapAx.plot( merged_ul["wavelength_l"], merged_ul["order_distance"] * pixel_scale_mean, label=thisOrder, ) gapAx.set_xlabel("wavelength (nm)", fontsize=9) gapAx.set_ylabel("inter-order\ngap (arcsec)", fontsize=9) gapAx.set_xlim(x_limits) gapAx.tick_params(axis="both", which="major", labelsize=8) gapAx.set_title(f"Inter-order gap measured between adjacent orders", fontsize=9) else: fwhmAx.set_title('Line FWHM measured via 0.5" pinhole arc-lamp frame', fontsize=9) fwhmAx.scatter(group["wavelength"], group["fwhm_pin_px"], alpha=0.1) mean_fwhm = group["fwhm_pin_px"].mean() std_fwhm = group["fwhm_pin_px"].std() fwhmAx.set_ylabel("FWHM (px)", fontsize=9) mean_wavelength = group["wavelength"].mean() fwhmAx.errorbar(mean_wavelength, mean_fwhm, yerr=std_fwhm, fmt="o", color="black", alpha=0.6) fwhmAx.tick_params(axis="both", which="major", labelsize=8) x_limits = fwhmAx.get_xlim() resAx.set_title('Resolution as measured via 0.5" pinhole arc-lamp frame', fontsize=9) resAx.scatter(group["wavelength"], group["R_pin"], alpha=0.1) # CALCULATE THE MEAN AND STD DEV OF THE GROUP AND ADD TO THE PLOT mean_resol = group["R_pin"].mean() std_resol = group["R_pin"].std() # ADD TO THE POINT THE ERROR BAR CONTAINED IN STD_RED resAx.errorbar(mean_wavelength, mean_resol, yerr=std_resol, fmt="o", color="black", alpha=0.6) resAx.set_ylabel("R", fontsize=9) resAx.tick_params(axis="both", which="major", labelsize=8) # resAx.scatter(orderPixelTable["wavelength"], orderPixelTable["R"], alpha=0.1) resAx.set_xlabel("wavelength (nm)", fontsize=9) fwhmAx.set_ylim([lowerFwhm, upperFwhm]) resAx.set_ylim([lowerR, upperR]) resAx.set_xlim(x_limits) utcnow = datetime.now(timezone.utc) utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S") # GET FILENAME FOR THE RESIDUAL PLOT if not self.sofName: res_plots = filenamer(log=self.log, frame=self.pinholeFrame, settings=self.settings) res_plots = res_plots.replace(".fits", ".pdf") else: polyOrders = [orderDeg, wavelengthDeg, slitDeg] if isinstance(orderDeg, list): merged_list = [] for sublist in polyOrders: merged_list.extend(sublist) polyOrders = merged_list polyOrders[:] = [str(l) for l in polyOrders] polyOrders = "".join(polyOrders) res_plots = self.sofName + f"_RESIDUALS_{polyOrders}.pdf" if self.firstGuessMap: filePath = f"{self.qcDir}/{res_plots}" else: filePath = f"{self.qcDir}/{res_plots}" self.products = pd.concat( [ self.products, pd.Series( { "soxspipe_recipe": self.recipeName, "product_label": "DISP_MAP_RES", "file_name": res_plots, "file_type": "PDF", "obs_date_utc": self.dateObs, "reduction_date_utc": utcnow, "product_desc": f"{self.arm} dispersion solution QC plots", "file_path": filePath, "label": "QC", } ) .to_frame() .T, ], ignore_index=True, ) qc_settings_plot_tables( log=self.log, qc=self.qc, qcAx=qcAx, settings={**self.recipeSettings, **{"exptime": self.exptime}}, settingsAx=settingsAx, ) plt.tight_layout() if self.debug: plt.show() if not self.settings["tune-pipeline"]: plt.savefig(filePath, dpi=360, format="pdf") plt.close("all") if self.settings["tune-pipeline"]: import codecs filePath = f"residuals.txt" exists = os.path.exists(filePath) if not exists: with codecs.open(filePath, encoding="utf-8", mode="w") as writeFile: writeFile.write( f"polyOrders,mean_x_res,mean_y_res,mean_res,std_res,median_res,median_x_res,median_y_res,CLINE \n" ) with codecs.open(filePath, encoding="utf-8", mode="a") as writeFile: writeFile.write( f"{polyOrders},{mean_x_res:0.4f},{mean_y_res:0.4f},{mean_res:2.4f},{std_res:2.4f},{median_res:2.4f},{median_x_res:2.4f},{median_y_res:2.4f},{self.CLINE}\n" ) self.log.debug("completed the ``create_dispersion_map_qc_plot`` method") return res_plots def _clip_on_measured_line_metrics(self, orderPixelTable): """*clip lines & sets of lines based on measured line metrics (from daostarfinder etc)* **Key Arguments:** - `orderPixelTable` -- panda's data-frame containing measure line metrics **Return:** - `orderPixelTable` -- the data-frame with clipped lines indicated in the "sigma_clipped" column """ self.log.debug("starting the ``_clip_on_measured_line_metrics`` method") import matplotlib.pyplot as plt from astropy.stats import sigma_clip, sigma_clipped_stats from astropy.visualization import hist # LAYOUT THE FIGURE fig = plt.figure(figsize=(6, 12), constrained_layout=True) gs = fig.add_gridspec(8, 4) toprow = fig.add_subplot(gs[0:2, :]) midrow = fig.add_subplot(gs[2:4, :]) midrow2 = fig.add_subplot(gs[4:6, :]) if self.firstGuessMap: bottomleft = fig.add_subplot(gs[6:8, 0:2]) bottomright = fig.add_subplot(gs[6:8, 2:]) toprow.set_title("Pre-fitting QC Clipping on Pinhole-Sets", fontsize=10) columnsNoStrings = list(orderPixelTable.columns) try: columnsNoStrings.remove("ion") except: pass if self.firstGuessMap: # GROUP BY ARC LINES (MPH SETS) lineGroups = ( orderPixelTable.loc[(orderPixelTable["dropped"] == False)][columnsNoStrings] .groupby(["wavelength", "order"]) .std() ) lineGroups = lineGroups.reset_index() # SIGMA-CLIP THE DATA ON SCATTER # masked_residuals = sigma_clip( # lineGroups["xy_diff"], sigma_lower=5000, sigma_upper=7, maxiters=5, cenfunc='median', stdfunc='mad_std') # lineGroups["sigma_clipped_scatter"] = masked_residuals.mask # SIGMA-CLIP THE DATA ON SCATTER lineGroups["sigma_clipped_scatter"] = False masked_residuals = sigma_clip( lineGroups["x_diff"], sigma_lower=5000, sigma_upper=7, maxiters=3, cenfunc="median", stdfunc="mad_std" ) lineGroups["sigma_clipped_x"] = masked_residuals.mask masked_residuals = sigma_clip( lineGroups["y_diff"], sigma_lower=5000, sigma_upper=7, maxiters=3, cenfunc="median", stdfunc="mad_std" ) lineGroups["sigma_clipped_y"] = masked_residuals.mask masked_residuals = sigma_clip( lineGroups["xy_diff"], sigma_lower=5000, sigma_upper=7, maxiters=3, cenfunc="median", stdfunc="mad_std" ) lineGroups["sigma_clipped_xy"] = masked_residuals.mask lineGroups.loc[ ( (lineGroups["sigma_clipped_y"] == True) | (lineGroups["sigma_clipped_x"] == True) | (lineGroups["sigma_clipped_xy"] == True) ), "sigma_clipped_scatter", ] = True bottomleft.scatter( x=lineGroups["x_diff"], # numpy array of x-points y=lineGroups["y_diff"], # numpy array of y-points # 1 number or array of areas for each datapoint (i.e. point size) s=1, c="black", # color or sequence of color, optional, default marker="x", alpha=0.6, label="arc line shift from predicted position", ) bottomleft.scatter( # numpy array of x-points x=lineGroups.loc[(lineGroups["sigma_clipped_scatter"] == True)]["x_diff"], # numpy array of y-points y=lineGroups.loc[(lineGroups["sigma_clipped_scatter"] == True)]["y_diff"], # 1 number or array of areas for each datapoint (i.e. point size) s=5, c="red", # color or sequence of color, optional, default marker="x", alpha=0.6, label="clipped arc lines", ) bottomleft.set_ylabel(f"y-shift SD (px)", fontsize=12) bottomleft.set_xlabel(f"x-shift SD (px)", fontsize=12) bottomleft.tick_params(axis="both", which="major", labelsize=9) bottomleft.legend(loc="upper right", bbox_to_anchor=(1.0, -0.05), fontsize=4) hist( lineGroups[(lineGroups["sigma_clipped_scatter"] == False)]["xy_diff"], bins="scott", ax=bottomright, histtype="stepfilled", alpha=0.7, density=True, ) bottomright.set_xlabel("xy residual") bottomright.tick_params(axis="both", which="major", labelsize=9) # REMOVE THE CLIPPED DATA BEFORE CLIPPING ON FWHM mask = lineGroups["sigma_clipped_scatter"] == True dropGroups = lineGroups.loc[mask] setsToDrop = dropGroups[["wavelength", "order"]] s = orderPixelTable[["wavelength", "order"]].merge(setsToDrop, indicator=True, how="left") s["dropped"] = False s.loc[(s["_merge"] == "both"), "dropped"] = True orderPixelTable["droppedOnScatter"] = s["dropped"].values orderPixelTable.loc[(orderPixelTable["droppedOnScatter"] == True), "dropped"] = True # SIGMA-CLIP THE DATA ON FWHM lineGroups = ( orderPixelTable.loc[(orderPixelTable["dropped"] == False)][columnsNoStrings] .groupby(["wavelength", "order"]) .mean() ) lineGroups = lineGroups.reset_index() masked_residuals = sigma_clip( lineGroups["fwhm_pin_px"], sigma_lower=2.5, sigma_upper=5, maxiters=3, cenfunc="median", stdfunc="mad_std" ) lineGroups["sigma_clipped_fwhm"] = masked_residuals.mask lineGroups["sigma_clipped"] = masked_residuals.mask toprow.scatter( x=lineGroups["wavelength"], # numpy array of x-points y=lineGroups["fwhm_pin_px"], # numpy array of y-points # 1 number or array of areas for each datapoint (i.e. point size) s=1, c="black", # color or sequence of color, optional, default marker="x", alpha=0.6, label="measured pinhole lines", ) toprow.scatter( x=lineGroups[(lineGroups["sigma_clipped_fwhm"] == True)]["wavelength"], # numpy array of x-points y=lineGroups[(lineGroups["sigma_clipped_fwhm"] == True)]["fwhm_pin_px"], # numpy array of y-points # 1 number or array of areas for each datapoint (i.e. point size) s=5, c="red", # color or sequence of color, optional, default marker="x", alpha=0.6, label="clipped pinhole lines", ) toprow.set_ylabel(f"fwhm (px)", fontsize=12) toprow.set_xlabel(f"wavelength (nm)", fontsize=12) toprow.tick_params(axis="both", which="major", labelsize=9) toprow.legend(loc="upper right", bbox_to_anchor=(1.0, -0.05), fontsize=4) # REMOVE THE CLIPPED DATA BEFORE CLIPPING ON FLUX mask = lineGroups["sigma_clipped_fwhm"] == True dropGroups = lineGroups.loc[mask] setsToDrop = dropGroups[["wavelength", "order"]] s = orderPixelTable[["wavelength", "order"]].merge(setsToDrop, indicator=True, how="left") s["dropped"] = False s.loc[(s["_merge"] == "both"), "dropped"] = True orderPixelTable["droppedOnFWHM"] = s["dropped"].values orderPixelTable.loc[(orderPixelTable["droppedOnFWHM"] == True), "dropped"] = True # SIGMA-CLIP THE DATA ON FLUX lineGroups = lineGroups.loc[~mask] masked_residuals = sigma_clip( lineGroups["flux"], sigma_lower=5, sigma_upper=5, maxiters=5, cenfunc="mean", stdfunc="std" ) lineGroups["sigma_clipped_flux"] = masked_residuals.mask midrow.scatter( x=lineGroups["wavelength"], # numpy array of x-points y=lineGroups["flux"], # numpy array of y-points # 1 number or array of areas for each datapoint (i.e. point size) s=1, c="black", # color or sequence of color, optional, default marker="x", alpha=0.6, label="pinhole flux", ) midrow.set_ylabel(f"pinhole flux", fontsize=12) midrow.set_xlabel(f"wavelength (nm)", fontsize=12) midrow.tick_params(axis="both", which="major", labelsize=9) midrow.legend(loc="upper right", bbox_to_anchor=(1.0, -0.05), fontsize=4) midrow.scatter( x=lineGroups[(lineGroups["sigma_clipped_flux"] == True)]["wavelength"], # numpy array of x-points y=lineGroups[(lineGroups["sigma_clipped_flux"] == True)]["flux"], # numpy array of y-points # 1 number or array of areas for each datapoint (i.e. point size) s=5, c="red", # color or sequence of color, optional, default marker="x", alpha=0.6, label="clipped pinhole lines", ) midrow.legend(loc="upper right", bbox_to_anchor=(1.0, -0.05), fontsize=4) # REMOVE THE CLIPPED DATA BEFORE CLIPPING ON PEAK mask = lineGroups["sigma_clipped_flux"] == True dropGroups = lineGroups.loc[mask] setsToDrop = dropGroups[["wavelength", "order"]] s = orderPixelTable[["wavelength", "order"]].merge(setsToDrop, indicator=True, how="left") s["dropped"] = False s.loc[(s["_merge"] == "both"), "dropped"] = True orderPixelTable["droppedOnFlux"] = s["dropped"].values orderPixelTable.loc[(orderPixelTable["droppedOnFlux"] == True), "dropped"] = True # SIGMA-CLIP THE DATA ON FLUX lineGroups = lineGroups.loc[~mask] lineGroups["peak"] = lineGroups["peak"] / lineGroups["flux"] masked_residuals = sigma_clip( lineGroups["peak"], sigma_lower=5000, sigma_upper=7, maxiters=5, cenfunc="mean", stdfunc="std" ) lineGroups["sigma_clipped_peak"] = masked_residuals.mask midrow2.scatter( x=lineGroups["wavelength"], # numpy array of x-points y=lineGroups["peak"], # numpy array of y-points # 1 number or array of areas for each datapoint (i.e. point size) s=1, c="black", # color or sequence of color, optional, default marker="x", alpha=0.6, label="pinhole peak flux / mean flux", ) midrow2.set_ylabel(f"pinhole peak flux", fontsize=12) midrow2.set_xlabel(f"wavelength (nm)", fontsize=12) midrow2.tick_params(axis="both", which="major", labelsize=9) midrow2.legend(loc="upper right", bbox_to_anchor=(1.0, -0.05), fontsize=4) midrow2.scatter( x=lineGroups[(lineGroups["sigma_clipped_peak"] == True)]["wavelength"], # numpy array of x-points y=lineGroups[(lineGroups["sigma_clipped_peak"] == True)]["peak"], # numpy array of y-points # 1 number or array of areas for each datapoint (i.e. point size) s=5, c="red", # color or sequence of color, optional, default marker="x", alpha=0.6, label="clipped pinhole lines", ) midrow2.legend(loc="upper right", bbox_to_anchor=(1.0, -0.05), fontsize=4) # REMOVE THE CLIPPED DATA mask = lineGroups["sigma_clipped_peak"] == True dropGroups = lineGroups.loc[mask] setsToDrop = dropGroups[["wavelength", "order"]] s = orderPixelTable[["wavelength", "order"]].merge(setsToDrop, indicator=True, how="left") s["dropped"] = False s.loc[(s["_merge"] == "both"), "dropped"] = True orderPixelTable["droppedOnPeak"] = s["dropped"].values orderPixelTable.loc[(orderPixelTable["droppedOnPeak"] == True), "dropped"] = True if self.debug: plt.show() plt.close("all") self.log.debug("completed the ``_clip_on_measured_line_metrics`` method") return orderPixelTable
[docs] def update_static_line_list_detector_positions(self, originalOrderPixelTable, dispersionMapPath): """*using a first pass dispersion solution, update the original static line list* **Key Arguments:** - `originalOrderPixelTable` -- original order pixel table - `dispersionMapPath` -- path to the first pass dispersion solution **Return:** - `updatedLineList` -- updated static line list """ self.log.debug("starting the ``update_static_line_list_detector_positions`` method") from soxspipe.commonutils import dispersion_map_to_pixel_arrays from soxspipe.commonutils.toolkit import read_spectral_format import pandas as pd from astropy.table import Table # GET UNIQUE VALUES OF order AND WAVELENGTH uniquecolNames = originalOrderPixelTable[["order", "wavelength"]].drop_duplicates() orders = uniquecolNames["order"] wavelengths = uniquecolNames["wavelength"] slit_positions = self.uniqueSlitPos slit_indexes = list(range(0, len(self.uniqueSlitPos), 1)) dfCollection = [] for si, sp in zip(slit_indexes, slit_positions): myDict = { "order": orders, "wavelength": wavelengths, "slit_position": [sp] * len(orders), "slit_index": [si] * len(orders), } orderPixelTable = pd.DataFrame(myDict) floats = ["wavelength", "slit_position"] ints = ["order", "slit_index"] for f in floats: orderPixelTable[f] = orderPixelTable[f].astype(float) for i in ints: orderPixelTable[i] = orderPixelTable[i].astype(int) orderPixelTable = dispersion_map_to_pixel_arrays( log=self.log, dispersionMapPath=dispersionMapPath, orderPixelTable=orderPixelTable, removeOffDetectorLocation=False, ) orderPixelTable.rename(columns={"fit_x": "detector_x", "fit_y": "detector_y"}, inplace=True) orderPixelTable = orderPixelTable[ ["wavelength", "order", "slit_index", "slit_position", "detector_x", "detector_y"] ] dfCollection.append(orderPixelTable) updatedLineList = pd.concat(dfCollection, ignore_index=True) self.log.debug("completed the ``update_static_line_list_detector_positions`` method") return updatedLineList
[docs] def create_new_static_line_list(self, dispersionMapPath): """*using a first pass dispersion solution, use a line atlas to generate a more accurate and more complete static line list* **Key Arguments:** - `dispersionMapPath` -- path to the first pass dispersion solution **Return:** - `newPredictedLineList` -- a new predicted line list (to replace the static calibration line-list) """ self.log.debug("starting the ``create_new_static_line_list`` method") from soxspipe.commonutils import dispersion_map_to_pixel_arrays from soxspipe.commonutils.toolkit import read_spectral_format import pandas as pd from astropy.table import Table dp = self.detectorParams # READ THE SPECTRAL FORMAT TABLE TO DETERMINE THE LIMITS OF THE TRACES orderNums, waveLengthMin, waveLengthMax = read_spectral_format( log=self.log, settings=self.settings, arm=self.arm ) # FIND THE LINE ATLAS calibrationRootPath = get_calibrations_path(log=self.log, settings=self.settings) lineAtlas = calibrationRootPath + "/" + dp["line-atlas"] # LINE LIST TO PANDAS DATAFRAME lineAtlas = Table.read(lineAtlas, format="fits") lineAtlas = lineAtlas.to_pandas() # CLEAN UP LINE LIST DATA lineAtlas["ion"] = lineAtlas["ion"].str.decode("ascii") lineAtlas["source"] = lineAtlas["source"].str.decode("ascii") lineAtlas["wave"] = lineAtlas["wave"].round(5) # FILTER DATA FRAME # FIRST CREATE THE MASK # GET UNIQUE VALUES IN COLUMN uniqueions = lineAtlas["ion"].unique() # # mask = (lineAtlas['ion'].isin(["XeI", 'UNK'])) # mask = (lineAtlas['ion'].isin(["HgI", 'UNK'])) # # mask = (lineAtlas['ion'].isin(["NeI", "NeII", 'UNK'])) # # mask = (lineAtlas['ion'].isin(["Ar", "ArI", "ArII", "ArIII", 'UNK'])) # lineAtlas = lineAtlas.loc[mask] # ORDER BY MOST INTENSE LINES lineAtlas.sort_values(["amplitude"], ascending=[False], inplace=True) lineAtlas = lineAtlas.head(500) # try: # # FILTER DATAFRAME TO EXCLUDE source = "NIST" and amplitude > 1000 # mask = (lineAtlas['source'] != "NIST") & (lineAtlas['amplitude'] > 1000) # lineAtlas = lineAtlas.loc[mask] # except: # pass dfCollection = [] for o, wmin, wmax in zip(orderNums, waveLengthMin, waveLengthMax): wrange = wmax - wmin wmin -= wrange / 5 wmax += wrange / 5 # FILTER DATA FRAME # FIRST CREATE THE MASK mask = lineAtlas["wave"].between(wmin, wmax) filteredDf = lineAtlas.loc[mask] filteredDf = filteredDf.drop_duplicates(subset=["wave", "ion"]) wave = filteredDf["wave"] ion = filteredDf["ion"] if not self.firstGuessMap: slit_positions = [0.0] slit_indexes = [4] else: slit_positions = self.uniqueSlitPos slit_indexes = list(range(0, len(self.uniqueSlitPos), 1)) # xpd-update-filter-dataframe-column-values for si, sp in zip(slit_indexes, slit_positions): myDict = { "order": [o] * len(wave), "wavelength": wave, "slit_position": [sp] * len(wave), "slit_index": [si] * len(wave), "ion": ion, } orderPixelTable = pd.DataFrame(myDict) floats = ["wavelength", "slit_position"] ints = ["order", "slit_index"] for f in floats: orderPixelTable[f] = orderPixelTable[f].astype(float) for i in ints: orderPixelTable[i] = orderPixelTable[i].astype(int) orderPixelTable = dispersion_map_to_pixel_arrays( log=self.log, dispersionMapPath=dispersionMapPath, orderPixelTable=orderPixelTable, removeOffDetectorLocation=False, ) orderPixelTable.rename(columns={"fit_x": "detector_x", "fit_y": "detector_y"}, inplace=True) orderPixelTable = orderPixelTable[ ["ion", "wavelength", "order", "slit_index", "slit_position", "detector_x", "detector_y"] ] dfCollection.append(orderPixelTable) newPredictedLineList = pd.concat(dfCollection, ignore_index=True) from astropy.table import Table t = Table.from_pandas(newPredictedLineList) # t.write("Xe.fits", overwrite=True) t.write("Hg.fits", overwrite=True) # t.write("Ne.fits", overwrite=True) # t.write("Ar.fits", overwrite=True) # sys.exit(0) self.log.debug("completed the ``create_new_static_line_list`` method") return newPredictedLineList
# use the tab-trigger below for new method # xt-class-method
[docs] def measure_line_position( stampInfo, log, windowHalf, iraf, sigmaLimit, iteration, brightest=False, exclude_border=False, multipinhole=False, returnAll=True, debug=False, ): """*measure the line position on a given stamp* **Key Arguments:** - `stampInfo` -- stamp pixels, xlow, xup, ylow, yup - `log` -- logger - `windowHalf` -- half of the stamp side - `iraf` -- run IRAF source detection to get FWHM? - `sigmaLimit` -- stamp source detection minimum sigma - `brightest` -- find the brightest source? - `multipinhole` -- is this a multipinhole frame? **Returns:** - list of detected line positions with properties """ log.debug("starting the ``measure_line_position`` function") import numpy as np from photutils import DAOStarFinder, IRAFStarFinder from astropy.stats import sigma_clipped_stats import logging # FIX ASTROPY LOGGING LEVEL RESET logging.getLogger().setLevel(logging.INFO + 5) # EXTRACT STAMP INFORMATION stamp, xlow, xup, ylow, yup = stampInfo[0], stampInfo[1], stampInfo[2], stampInfo[3], stampInfo[4] # CONVERT CCDDATA TO MASKED NUMPY ARRAY stamp = np.ma.array(stamp.data, mask=stamp.mask) # CALCULATE STAMP STATISTICS FOR DETECTION THRESHOLD mean, median, std = sigma_clipped_stats(stamp, sigma=3.0, stdfunc="mad_std", cenfunc="median") # PREPARE ITERATION TEXT FOR LOGGING iterationText = f", iter #{iteration}" if iteration is not False else "" # DETECT SOURCES USING DAO STARFINDER try: daofind = DAOStarFinder( fwhm=2.0, threshold=sigmaLimit * std, roundlo=-2.0, roundhi=2.0, sharplo=-0.5, sharphi=2.0, exclude_border=exclude_border, ) # SUBTRACT MEDIAN FOR BETTER DETECTION IN LOW SIGNAL IMAGES sources = daofind(stamp.data - median, mask=stamp.mask) except Exception as e: sources = None # INITIALIZE DETECTION VARIABLES old_resid = windowHalf * 4 selectedSource = None observed_x = np.nan observed_y = np.nan fwhm = np.nan old_detectionSigma = 0.0 predictedLines = [] keepValues = ["sharpness", "roundness1", "roundness2", "npix", "sky", "peak", "flux"] # PROCESS DETECTED SOURCES if sources: if returnAll: # RETURN ALL DETECTED SOURCES for source in sources: predictedLine = { "observed_x": source["xcentroid"] + xlow, "observed_y": source["ycentroid"] + ylow, "stamp_x": source["xcentroid"], "stamp_y": source["ycentroid"], "detectionSigma": source["peak"] / std, "fwhm_pin_px": np.nan, } # ADD PHOTOMETRIC PROPERTIES for k in keepValues: predictedLine[k] = source[k] predictedLines.append(predictedLine) else: # SELECT BEST SINGLE SOURCE (CLOSEST TO CENTER OR BRIGHTEST) if len(sources) > 1: for source in sources: tmp_x = source["xcentroid"] tmp_y = source["ycentroid"] new_resid = np.sqrt((windowHalf - tmp_x) ** 2 + (windowHalf - tmp_y) ** 2) detectionSigma = source["peak"] / std # SELECT BY BRIGHTNESS OR PROXIMITY TO CENTER if (brightest and detectionSigma > old_detectionSigma) or (not brightest and new_resid < old_resid): observed_x = tmp_x + xlow observed_y = tmp_y + ylow stamp_x = tmp_x stamp_y = tmp_y old_resid = new_resid selectedSource = source old_detectionSigma = detectionSigma else: # ONLY ONE SOURCE DETECTED observed_x = sources[0]["xcentroid"] + xlow observed_y = sources[0]["ycentroid"] + ylow stamp_x = sources[0]["xcentroid"] stamp_y = sources[0]["ycentroid"] selectedSource = sources[0] detectionSigma = sources[0]["peak"] / std # BUILD PREDICTED LINE DICTIONARY predictedLine = { "observed_x": observed_x, "observed_y": observed_y, "stamp_x": stamp_x, "stamp_y": stamp_y, "detectionSigma": detectionSigma, } # ADD PHOTOMETRIC PROPERTIES for k in keepValues: predictedLine[k] = selectedSource[k] # OPTIONALLY MEASURE FWHM USING IRAF STARFINDER if iraf: try: iraf_find = IRAFStarFinder( fwhm=3.0, threshold=1 * std, roundlo=-2.0, roundhi=2.0, sharplo=-1, sharphi=3.0, exclude_border=True, xycoords=[(stamp_x, stamp_y)], ) iraf_sources = iraf_find(stamp) predictedLine["fwhm_pin_px"] = iraf_sources["fwhm"][0] except Exception: predictedLine["fwhm_pin_px"] = np.nan else: predictedLine["fwhm_pin_px"] = np.nan predictedLines.append(predictedLine) # DEBUG VISUALIZATION (DISABLED) if False and debug and iteration == 0: import matplotlib.pyplot as plt plt.clf() plt.imshow(stamp) plt.scatter(0, 0, marker="x", s=30) plt.scatter(observed_x - xlow, observed_y - ylow, s=30) plt.text( windowHalf - 2, windowHalf - 2, f"{observed_x-xlow:0.2f},{observed_y - ylow:0.2f}", fontsize=16, c="black", verticalalignment="bottom", ) plt.text( 2, 2, f"{sigmaLimit}$\\sigma$, {iterationText}\n,{detectionSigma:0.1f}", fontsize=16, c="white", verticalalignment="bottom", ) plt.show() log.debug("completed the ``measure_line_position`` function") return predictedLines
[docs] def straighten_mph_sets(group): """ *Straighten multi-pinhole sets by projecting points onto fitted line* **Key Arguments:** - ``group`` -- dataframe group containing observed_x and observed_y coordinates **Returns:** - ``group`` -- group with added tilt_corrected_x and tilt_corrected_y columns """ import numpy as np # EXTRACT OBSERVED COORDINATES x = group["observed_x"].values y = group["observed_y"].values # FIT LINEAR MODEL: y = m*x + c A = np.vstack([x, np.ones_like(x)]).T m, c = np.linalg.lstsq(A, y, rcond=None)[0] # PROJECT EACH POINT ONTO THE FITTED LINE # For point (x0, y0), find closest point (xp, yp) on line y = m*x + c # Perpendicular projection formulas: # xp = (m*(y0 - c) + x0) / (m**2 + 1) # yp = m*xp + c xp = (m * (y - c) + x) / (m**2 + 1) yp = m * xp + c # ADD TILT-CORRECTED COORDINATES TO GROUP group = group.copy() group["tilt_corrected_x"] = xp group["tilt_corrected_y"] = yp # DEBUG VISUALIZATION (DISABLED) if False: import matplotlib.pyplot as plt plt.figure(figsize=(6, 4)) plt.scatter(x, y, color="blue", label="Original points") plt.plot(x, m * x + c, color="green", label="Fitted line") plt.scatter(xp, yp, color="red", marker="x", label="Tilt-corrected points") if "detector_x_shifted" in group.columns and "detector_y_shifted" in group.columns: plt.scatter( group["detector_x_shifted"], group["detector_y_shifted"], color="orange", marker="^", label="Detector shifted", ) # LABEL EACH POINT WITH SLIT INDEX for xi, yi, slit_idx in zip(x, y, group["slit_index"].values): plt.text(xi, yi, str(slit_idx), fontsize=8, color="black", ha="center", va="bottom") plt.xlabel("observed_x") plt.ylabel("observed_y") plt.legend() plt.title("Straighten Multi-Pinhole Set") plt.tight_layout() plt.text( 0.95, 0.95, f"N={len(x)}", transform=plt.gca().transAxes, fontsize=10, verticalalignment="top", color="blue" ) plt.show() return group
def _plot_slit_index_comparisons(df): """ Plot slit_index vs xy_diff, x_diff, and y_diff, color-coded by slit_index. Replace legend labels with mean and std for each slit_index. The order number is shown as the figure title. Each panel title includes the global mean and std for that metric. Adds a fourth panel: scatter plot of x_diff vs y_diff, color-coded by slit_index. """ import numpy as np import matplotlib.pyplot as plt slit_indexes = np.sort(df["slit_index"].unique()) try: order_num = df["order"].iloc[0] if "order" in df.columns else None except: return cmap = plt.get_cmap("tab10") colors = {idx: cmap(i % 10) for i, idx in enumerate(slit_indexes)} # Compute global mean and std for each metric global_xy_mean, global_xy_std = df["xy_diff"].mean(), df["xy_diff"].std() global_x_mean, global_x_std = df["x_diff"].mean(), df["x_diff"].std() global_y_mean, global_y_std = df["y_diff"].mean(), df["y_diff"].std() fig, axes = plt.subplots(1, 4, figsize=(24, 5), sharey=False) handles = [[], [], [], []] labels = [[], [], [], []] for idx in slit_indexes: mask = df["slit_index"] == idx xy_mean, xy_std = df.loc[mask, "xy_diff"].mean(), df.loc[mask, "xy_diff"].std() x_mean, x_std = df.loc[mask, "x_diff"].mean(), df.loc[mask, "x_diff"].std() y_mean, y_std = df.loc[mask, "y_diff"].mean(), df.loc[mask, "y_diff"].std() h0 = axes[0].scatter([idx] * np.sum(mask), df.loc[mask, "xy_diff"], color=colors[idx], alpha=0.7) h1 = axes[1].scatter([idx] * np.sum(mask), df.loc[mask, "x_diff"], color=colors[idx], alpha=0.7) h2 = axes[2].scatter([idx] * np.sum(mask), df.loc[mask, "y_diff"], color=colors[idx], alpha=0.7) h3 = axes[3].scatter( df.loc[mask, "x_diff"], df.loc[mask, "y_diff"], color=colors[idx], alpha=0.7, label=f"slit_index {idx}" ) handles[0].append(h0) handles[1].append(h1) handles[2].append(h2) handles[3].append(h3) labels[0].append(f"slit_index {idx}: μ={xy_mean:.3f}, σ={xy_std:.3f}") labels[1].append(f"slit_index {idx}: μ={x_mean:.3f}, σ={x_std:.3f}") labels[2].append(f"slit_index {idx}: μ={y_mean:.3f}, σ={y_std:.3f}") labels[3].append(f"slit_index {idx}") axes[0].set_title(f"slit_index vs xy_diff\nGlobal μ={global_xy_mean:.3f}, σ={global_xy_std:.3f}") axes[1].set_title(f"slit_index vs x_diff\nGlobal μ={global_x_mean:.3f}, σ={global_x_std:.3f}") axes[2].set_title(f"slit_index vs y_diff\nGlobal μ={global_y_mean:.3f}, σ={global_y_std:.3f}") axes[3].set_title("x_diff vs y_diff (color: slit_index)") axes[3].set_xlabel("x_diff") axes[3].set_ylabel("y_diff") for i, ax in enumerate(axes): if i < 3: ax.set_xlabel("slit_index") ax.set_ylabel("difference") ax.legend(handles[i], labels[i], title="slit_index", bbox_to_anchor=(1.05, 1), loc="upper left") else: ax.legend(handles[i], labels[i], title="slit_index", bbox_to_anchor=(1.05, 1), loc="upper left") if order_num is not None: fig.suptitle(f"Order {order_num}. {len(df.index)} points", fontsize=16) plt.tight_layout() plt.show() return
[docs] def find_largest_cluster_center(x, y, eps=2.0, min_samples=5): """ *Find the center of the largest cluster in (x, y) data using DBSCAN* **Key Arguments:** - ``x`` -- x-coordinates of points - ``y`` -- y-coordinates of points - ``eps`` -- maximum distance between points in a cluster - ``min_samples`` -- minimum samples required to form a cluster **Returns:** - ``center_x, center_y`` -- coordinates of largest cluster center, or (None, None) if no cluster found """ import numpy as np from sklearn.cluster import DBSCAN # STACK COORDINATES AND RUN DBSCAN CLUSTERING points = np.column_stack((x, y)) clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points) labels = clustering.labels_ # FIND LARGEST CLUSTER (EXCLUDING NOISE LABEL -1) unique, counts = np.unique(labels[labels != -1], return_counts=True) if len(counts) == 0: # NO CLUSTERS FOUND - RETURN NONE return None, None # CALCULATE CENTER OF LARGEST CLUSTER largest_cluster = unique[np.argmax(counts)] mask = labels == largest_cluster center_x = np.mean(x[mask]) center_y = np.mean(y[mask]) return center_x, center_y