Source code for soxspipe.commonutils.dispersion_map_to_pixel_arrays

#!/usr/bin/env python
# encoding: utf-8
"""
*use a first-guess dispersion map to convert wavelengths to pixels*

Author
: David Young

Date Created
: April 15, 2021
"""

from soxspipe.commonutils.polynomials import chebyshev_order_wavelength_polynomials
from os.path import expanduser
from fundamentals import tools
from builtins import object
import sys
import os

os.environ["TERM"] = "vt100"


[docs] def dispersion_map_to_pixel_arrays( log, dispersionMapPath, orderPixelTable, removeOffDetectorLocation=True, trimColumns=False ): """*Use a dispersion solution to convert wavelength, slit-position and echelle order numbers to X,Y pixel positions.* Return a line-list with x,y fits given a first guess dispersion map.* **Key Arguments:** - `log` -- logger - `dispersionMapPath` -- path to the dispersion map - `orderPixelTable` -- a data-frame including 'order', 'wavelength' and 'slit_pos' columns - `removeOffDetectorLocation` -- if data points are found to lie off the detector plane then remove them from the results. Default *True* - `trimColumns` -- if True, only return the specified columns. Default *False* **Usage:** ```python from soxspipe.commonutils import dispersion_map_to_pixel_arrays myDict = { "order": [11, 11, 11], "wavelength": [850.3, 894.3, 983.2], "slit_position": [0, 0, 0] } orderPixelTable = pd.DataFrame(myDict) orderPixelTable = dispersion_map_to_pixel_arrays( log=log, dispersionMapPath="/path/to/map.csv", orderPixelTable=orderPixelTable ) ``` """ log.debug("starting the ``dispersion_map_to_pixel_arrays`` function") from astropy.table import Table import math import numpy as np # READ THE FILE home = expanduser("~") dispersion_map = dispersionMapPath.replace("~", home) # SPEC FORMAT TO PANDAS DATAFRAME dat = Table.read(dispersion_map, format="fits") tableData = dat.to_pandas() # READ IN THE X- AND Y- GENERATING POLYNOMIALS FROM DISPERSION MAP FILE coeff = {} poly = {} check = 1 for index, row in tableData.iterrows(): axis = row["axis"].decode("utf-8") orderDeg = int(row["order_deg"]) wavelengthDeg = int(row["wavelength_deg"]) slitDeg = int(row["slit_deg"]) # print(axis, orderDeg, wavelengthDeg, slitDeg) if check: for i in range(0, orderDeg + 1): orderPixelTable[f"order_pow_{axis}_{i}"] = orderPixelTable["order"].pow(i) for j in range(0, wavelengthDeg + 1): orderPixelTable[f"wavelength_pow_{axis}_{j}"] = orderPixelTable["wavelength"].pow(j) for k in range(0, slitDeg + 1): orderPixelTable[f"slit_position_pow_{axis}_{k}"] = orderPixelTable["slit_position"].pow(k) # check = 0 coeff[axis] = [ float(v) for k, v in row.items() if k not in ["axis", "order_deg", "wavelength_deg", "slit_deg"] and not math.isnan(v) ] poly[axis] = chebyshev_order_wavelength_polynomials( log=log, orderDeg=orderDeg, wavelengthDeg=wavelengthDeg, slitDeg=slitDeg, exponentsIncluded=True, axis=axis ).poly # CONVERT THE ORDER-SORTED WAVELENGTH ARRAYS INTO ARRAYS OF PIXEL TUPLES orderPixelTable["fit_x"] = poly["x"](orderPixelTable, *coeff["x"]) orderPixelTable["fit_y"] = poly["y"](orderPixelTable, *coeff["y"]) orderPixelTable["fit_x"] = orderPixelTable["fit_x"].astype(np.float32) orderPixelTable["fit_y"] = orderPixelTable["fit_y"].astype(np.float32) if removeOffDetectorLocation: # FILTER DATA FRAME # FIRST CREATE THE MASK mask = (orderPixelTable["fit_x"] > 0) & (orderPixelTable["fit_y"] > 0) orderPixelTable = orderPixelTable.loc[mask] if trimColumns: orderPixelTable = orderPixelTable[["order", "wavelength", "slit_position", "fit_x", "fit_y"]] log.debug("completed the ``dispersion_map_to_pixel_arrays`` function") return orderPixelTable
[docs] def get_cached_coeffs(log, arm, settings, recipeName, orderDeg, wavelengthDeg, slitDeg, reset=False): """*find cached coefficients (if they exist)* Return a line-list with x,y fits given a first guess dispersion map.* **Key Arguments:** - ``log`` -- logger - ``arm`` -- the spectrograph arm. - ``settings`` pipeline settings dictionary - ``recipeName`` -- the name of the recipe. - ``orderDeg`` -- the order deg - ``wavelengthDeg`` -- wavelength degree - ``slitDeg`` -- slit degree - ``reset`` -- always reset the coeffs. Don't use cached. Default *False* **Usage:** ```python from soxspipe.commonutils import get_cached_coeffs xcoeff, ycoeff = get_cached_coeffs( log=log, arm=arm, settings=settings, recipeName=recipeName, orderDeg=orderDeg, wavelengthDeg=wavelengthDeg, slitDeg=slitDeg ) ``` """ log.debug("starting the ``get_cached_coeffs`` function") from astropy.table import Table import math import numpy as np # READ THE FILE home = expanduser("~") cache = settings["workspace-root-dir"].replace("~", home) + "/.cache" polyOrders = [orderDeg, wavelengthDeg, slitDeg] if isinstance(orderDeg, list): merged_list = [] for sublist in polyOrders: merged_list.extend(sublist) polyOrders = merged_list polyOrders[:] = [str(l) for l in polyOrders] polyOrders = "".join(polyOrders) filename = f"{recipeName}_{arm}_{polyOrders}.fits" filePath = f"{cache}/{filename}" coeff = {} if os.path.exists(filePath) and reset == False: dispersion_map = filePath # SPEC FORMAT TO PANDAS DATAFRAME dat = Table.read(dispersion_map, format="fits") tableData = dat.to_pandas() # READ IN THE X- AND Y- COEFF FROM DISPERSION MAP FILE for index, row in tableData.iterrows(): axis = row["axis"].decode("utf-8") orderDeg = int(row["order_deg"]) wavelengthDeg = int(row["wavelength_deg"]) slitDeg = int(row["slit_deg"]) coeff[axis] = [ float(v) for k, v in row.items() if k not in ["axis", "order_deg", "wavelength_deg", "slit_deg"] and not math.isnan(v) ] else: if isinstance(orderDeg, list): coeff["x"] = np.ones((orderDeg[0] + 1) * (wavelengthDeg[0] + 1) * (slitDeg[0] + 1), dtype=float) coeff["y"] = np.ones((orderDeg[1] + 1) * (wavelengthDeg[1] + 1) * (slitDeg[1] + 1), dtype=float) else: coeff["x"] = np.ones((orderDeg + 1) * (wavelengthDeg + 1) * (slitDeg + 1), dtype=float) coeff["y"] = np.ones((orderDeg + 1) * (wavelengthDeg + 1) * (slitDeg + 1), dtype=float) log.debug("completed the ``get_cached_coeffs`` function") return coeff["x"], coeff["y"]