#!/usr/bin/env python
# encoding: utf-8
"""
*fit and subtract background flux from scattered light from frame*
Author
: David Young
Date Created
: June 3, 2021
"""
from soxspipe.commonutils import keyword_lookup
from os.path import expanduser
from soxspipe.commonutils.toolkit import quicklook_image
from soxspipe.commonutils.toolkit import unpack_order_table
from fundamentals import tools
from builtins import object
import sys
import os
os.environ["TERM"] = "vt100"
[docs]
class subtract_background(object):
"""
*fit and subtract background flux from scattered light from frame*
**Key Arguments:**
- ``log`` -- logger
- ``settings`` -- the settings dictionary
- ``frame`` -- the frame to subtract background light from
- ``recipeName`` -- name of the parent recipe
- ``sofName`` -- the sof file name given to the parent recipe
- ``orderTable`` -- the order geometry table
- ``qcTable`` -- the data frame to collect measured QC metrics
- ``productsTable`` -- the data frame to collect output products
- ``lamp`` -- needed for UVB flats
- ``startNightDate`` -- YYYY-MM-DD date of the observation night. Default ""
**Usage:**
To setup your logger, settings and database connections, please use the ``fundamentals`` package (see tutorial here https://fundamentals.readthedocs.io/en/master/initialisation.html).
To fit and subtract the background from an image:
```python
from soxspipe.commonutils import subtract_background
background = subtract_background(
log=log,
frame=myCCDDataObject,
orderTable="/path/to/orderTable",
settings=settings
)
backgroundFrame, backgroundSubtractedFrame = background.subtract()
```
"""
def __init__(
self,
log,
frame,
orderTable,
sofName=False,
recipeName=False,
settings=False,
qcTable=False,
productsTable=False,
lamp="",
startNightDate="",
):
self.log = log
log.debug("instantiating a new 'subtract_background' object")
self.settings = settings
self.sofName = sofName
self.recipeName = recipeName
self.frame = frame
self.orderTable = orderTable
self.qc = qcTable
self.products = productsTable
self.lamp = lamp
self.startNightDate = startNightDate
from soxspipe.commonutils import detector_lookup
# KEYWORD LOOKUP OBJECT - LOOKUP KEYWORD FROM DICTIONARY IN RESOURCES
# FOLDER
self.kw = keyword_lookup(log=self.log, settings=self.settings).get
kw = self.kw
self.arm = frame.header[kw("SEQ_ARM")]
self.dateObs = frame.header[kw("DATE_OBS")]
self.inst = frame.header[kw("INSTRUME")]
# DETECTOR PARAMETERS LOOKUP OBJECT
self.detectorParams = detector_lookup(log=log, settings=settings).get(self.arm)
# SET IMAGE ORIENTATION
if self.detectorParams["dispersion-axis"] == "x":
self.axisA = "x"
self.axisB = "y"
else:
self.axisA = "y"
self.axisB = "x"
from soxspipe.commonutils.toolkit import utility_setup
self.qcDir, self.productDir = utility_setup(
log=self.log, settings=settings, recipeName=recipeName, startNightDate=startNightDate
)
quicklook_image(
log=self.log,
CCDObject=self.frame,
show=False,
ext="data",
stdWindow=3,
surfacePlot=True,
title="Initial input frame needing scattered light subtraction",
)
return None
[docs]
def subtract(self):
"""
*fit and subtract background light from frame*
**Return:**
- ``backgroundSubtractedFrame`` -- a CCDData object of the original input frame with fitted background light subtracted
"""
self.log.debug("starting the ``subtract`` method")
import numpy as np
import pandas as pd
from soxspipe.commonutils import toolkit
kw = self.kw
imageType = self.frame.header[kw("DPR_TYPE")].replace(",", "-")
imageTech = self.frame.header[kw("DPR_TECH")].replace(",", "-")
imageCat = self.frame.header[kw("DPR_CATG")].replace(",", "-")
self.log.print(
f"\n# FITTING AND SUBTRACTING SCATTERED LIGHT BACKGROUND FROM {self.arm} {imageCat} {imageTech} {imageType} FRAME"
)
binx = 1
biny = 1
try:
binx = self.frame.header[self.kw("WIN_BINX")]
biny = self.frame.header[self.kw("WIN_BINY")]
except:
pass
# UNPACK THE ORDER TABLE
orderPolyTable, orderPixelTable, orderMetaTable = unpack_order_table(
log=self.log, orderTablePath=self.orderTable, binx=binx, biny=biny, extend=4
)
originalMask = np.copy(self.frame.mask)
quicklook_image(
log=self.log, CCDObject=self.frame.data, show=False, ext=None, surfacePlot=True, title="Initial frame"
)
# MASK THE INNER ORDER AREA (AND BAD PIXELS)
self.mask_order_locations(orderPixelTable)
quicklook_image(
log=self.log,
CCDObject=self.frame,
show=False,
ext=None,
surfacePlot=True,
title="Initial input frame with order pixels masked",
)
backgroundFrame = self.create_background_image(
rowFitOrder=self.settings["background-subtraction"]["bspline-deg"],
gaussianSigma=self.settings["background-subtraction"]["gaussian-blur-sigma"],
)
toolkit.frame_to_32(backgroundFrame)
# REPLACE MASK
self.frame.mask = originalMask
backgroundSubtractedFrame = self.frame.subtract(backgroundFrame)
backgroundSubtractedFrame.header = self.frame.header
toolkit.frame_to_32(backgroundSubtractedFrame)
# GET FILENAME FOR THE RESIDUAL PLOT
saveToPath = False
if self.sofName:
backgroundQCImage = self.sofName + f"_BKGROUND.pdf"
saveToPath = self.qcDir + "/" + backgroundQCImage
quicklook_image(
log=self.log,
CCDObject=backgroundFrame,
show=False,
ext="data",
stdWindow=9,
surfacePlot=False,
title="Background Scattered Light",
saveToPath=saveToPath,
)
if saveToPath and not isinstance(self.products, bool):
from datetime import datetime
utcnow = datetime.utcnow()
utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S")
self.products = pd.concat(
[
self.products,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"product_label": f"BKGROUND{self.lamp}",
"file_name": backgroundQCImage,
"file_type": "PDF",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"product_desc": f"Fitted intra-order image background{self.lamp.replace('_', ' ')}",
"file_path": saveToPath,
"label": "QC",
}
)
.to_frame()
.T,
],
ignore_index=True,
)
self.log.debug("completed the ``subtract`` method")
return backgroundFrame, backgroundSubtractedFrame, self.products
[docs]
def mask_order_locations(self, orderPixelTable):
"""*mask the order locations and return the masked frame*
**Key Arguments:**
- ``orderPixelTable`` -- the order location in a pandas datafrmae.
"""
self.log.debug("starting the ``mask_order_locations`` method")
import numpy as np
# MASK DATA INSIDE OF ORDERS (EXPAND THE INNER-ORDER AREA IF NEEDED)
uniqueOrders = np.sort(orderPixelTable["order"].unique())
if self.axisA == "x":
axisALen = self.frame.data.shape[1]
axisBLen = self.frame.data.shape[0]
else:
axisALen = self.frame.data.shape[0]
axisBLen = self.frame.data.shape[1]
oTop = orderPixelTable["order"].min()
oBot = orderPixelTable["order"].max()
for o in uniqueOrders:
axisBcoord = orderPixelTable.loc[(orderPixelTable["order"] == o)][f"{self.axisB}coord"]
axisAcoord_edgeup = orderPixelTable.loc[(orderPixelTable["order"] == o)][f"{self.axisA}coord_edgeup"]
axisAcoord_edgelow = orderPixelTable.loc[(orderPixelTable["order"] == o)][f"{self.axisA}coord_edgelow"]
if o != oBot:
next_axisAcoord_edgeup = orderPixelTable.loc[(orderPixelTable["order"] == o + 1)][
f"{self.axisA}coord_edgeup"
]
bottomGap = axisAcoord_edgelow.values - next_axisAcoord_edgeup.values
expandBottom = np.median(bottomGap) / 2 - 3
if expandBottom < 2:
expandBottom = 2
else:
expandBottom = expandTop
if o != oTop:
previous_axisAcoord_edgelow = orderPixelTable.loc[(orderPixelTable["order"] == o - 1)][
f"{self.axisA}coord_edgelow"
]
topGap = previous_axisAcoord_edgelow.values - axisAcoord_edgeup.values
expandTop = np.median(topGap) / 2 - 3
if expandTop < 2:
expandTop = 2
else:
expandTop = expandBottom
axisAcoord_edgeup += expandTop
axisAcoord_edgelow -= expandBottom
try:
axisAcoord_edgelow, axisAcoord_edgeup, axisBcoord = zip(
*[
(x1, x2, b)
for x1, x2, b in zip(axisAcoord_edgelow, axisAcoord_edgeup, axisBcoord)
if x1 < axisALen and x2 > 0 and x2 < axisALen and b > 0 and b < axisBLen
]
)
except:
continue
for b, u, l in zip(
axisBcoord, np.ceil(axisAcoord_edgeup).astype(int), np.floor(axisAcoord_edgelow).astype(int)
):
if l < 0:
l = 0
if self.axisA == "x":
self.frame.mask[b, l:u] = 1
else:
self.frame.mask[l:u, b] = 1
if o == oTop:
# MASK TOP OF FRAME
mask_bottom = np.array(axisAcoord_edgeup) + 7
for (
b,
m,
) in zip(axisBcoord, np.ceil(mask_bottom).astype(int)):
if self.axisA == "x":
self.frame.mask[b, m:] = 1
else:
self.frame.mask[m:, b] = 1
if o == oBot:
# MASK BOTTOM OF FRAME
mask_top = np.array(axisAcoord_edgelow) - 7
for (
b,
m,
) in zip(axisBcoord, np.ceil(mask_top).astype(int)):
if m < 0:
m = 0
if self.axisA == "x":
self.frame.mask[b, :m] = 1
else:
self.frame.mask[:m, b] = 1
self.log.debug("completed the ``mask_order_locations`` method")
return None
[docs]
def create_background_image(self, rowFitOrder, gaussianSigma):
"""*model the background image from intra-order flux detected*
**Key Arguments:**
- ``rowFitOrder`` -- order of the polynomial fit to flux in each row
- ``gaussianSigma`` -- the sigma of the gaussian used to blur the final image
"""
self.log.debug("starting the ``create_background_image`` method")
from astropy.stats import sigma_clip, mad_std
import numpy as np
import pandas as pd
from astropy.nddata import CCDData
from scipy.signal import medfilt2d
from scipy.interpolate import splrep, splev
import scipy
import numpy.ma as ma
import math
import random
from soxspipe.commonutils.filenamer import filenamer
from os.path import expanduser
maskedImage = np.ma.array(self.frame.data, mask=self.frame.mask)
# SIGMA-CLIP THE DATA
clippedDataMask = sigma_clip(
maskedImage, sigma_lower=10, sigma_upper=50, maxiters=2, cenfunc="median", stdfunc="mad_std"
)
# COMBINE MASK WITH THE BAD PIXEL MASK
mask = (clippedDataMask.mask == 1) | (self.frame.mask == 1)
maskedImage = np.ma.array(self.frame.data, mask=mask)
maskedAsNanImage = self.frame.data.copy()
maskedAsNanImage[mask] = np.nan
maskedAsNanImage[:20, :] = 0
maskedAsNanImage[-20:, :] = 0
maskedAsNanImage[:, -20:] = 0
maskedAsNanImage[:, :20] = 0
import skimage
from skimage.morphology import disk
maskedAsNanImage = skimage.filters.median(maskedAsNanImage, footprint=disk(3))
# REMOVE -VE VALUES
maskedAsNanImage = np.where(maskedAsNanImage < 0, 0, maskedAsNanImage)
# REGENERATE A MASK OF NANS
mask = np.isnan(maskedAsNanImage)
maskedImage = np.ma.array(maskedAsNanImage, mask=mask)
quicklook_image(
log=self.log,
CCDObject=maskedImage,
show=False,
ext=True,
stdWindow=3,
surfacePlot=True,
title="Sigma clipped, blurred masked image",
)
# PLACEHOLDER ARRAY FOR BACKGROUND IMAGE
backgroundMap = np.zeros_like(self.frame)
for idx, row in enumerate(maskedImage):
# SET X TO A MASKED RANGE ... BLANK DATA BUT WITH MASK FROM IMAGE
xunmasked = ma.masked_array(np.linspace(0, len(row), len(row), dtype=int), mask=row.mask)
# fit = ma.polyfit(xunmasked, row, deg=rowFitOrder)
xfit = xunmasked.data
# yfit = np.polyval(fit, xfit)
xmasked = xunmasked[~xunmasked.mask]
xmin = xmasked.min()
xmax = xmasked.max()
rowmasked = row[~row.mask]
window = 7
hw = math.floor(window / 2)
# rowmaskedSmoothed = pd.Series(rowmasked).rolling(window=window, center=True).quantile(.1)
try:
rowmaskedSmoothed = pd.Series(rowmasked).rolling(window=window, center=True).median()
except:
rowmasked = rowmasked.astype(float)
# rowmasked = rowmasked.byteswap().newbyteorder() ## REMOVE IF ABOVE .astype(float) WORKS
rowmaskedSmoothed = pd.Series(rowmasked).rolling(window=window, center=True).median()
rowmaskedSmoothed[:hw] = rowmaskedSmoothed.iloc[hw + 1]
rowmaskedSmoothed[-hw:] = rowmaskedSmoothed.iloc[-hw - 1]
rowmasked[:hw] = rowmasked[hw + 1]
rowmasked[-hw:] = rowmasked[-hw - 1]
rowmaskedSmoothed = np.where(rowmaskedSmoothed < 0, 0, rowmaskedSmoothed)
seedKnots = xmasked[1 : -1 : window * 2]
tck, fp, ier, msg = splrep(xmasked, rowmaskedSmoothed, t=seedKnots, k=rowFitOrder, full_output=True)
t, c, k = tck
if ier == 10:
self.log.info(f"\t\tpoor fit on columns {idx}.\n")
yfit = splev(xfit, tck, ext=3)
# ADD FITTED ROW TO BACKGROUND IMAGE
backgroundMap[idx, :] = yfit
if False and random.randint(1, 401) == 42:
import matplotlib.pyplot as plt
fig, (ax1) = plt.subplots(1, 1, figsize=(30, 15))
plt.scatter(xmasked, rowmasked)
plt.scatter(xmasked, rowmaskedSmoothed)
plt.plot(xfit, yfit, "b")
plt.ylabel("flux")
plt.xlabel("pixels along row")
plt.show()
quicklook_image(
log=self.log,
CCDObject=backgroundMap,
show=False,
ext=None,
surfacePlot=True,
title="Scattered light background image",
)
backgroundMap = scipy.ndimage.filters.gaussian_filter(backgroundMap, gaussianSigma, truncate=2, axes=0)
# SET -VE T0 ZERO
backgroundMap = np.where(backgroundMap < 0, 0, backgroundMap)
quicklook_image(
log=self.log,
CCDObject=backgroundMap,
show=False,
ext=None,
surfacePlot=True,
title="Scattered light background image with median filtering",
)
backgroundMap = CCDData(backgroundMap, unit=self.frame.unit)
self.log.debug("completed the ``create_background_image`` method")
return backgroundMap