Source code for soxspipe.commonutils.data_organiser

#!/usr/bin/env python
# encoding: utf-8
"""
*The SOXSPIPE Data Organiser*

Author
: David Young

Date Created
: March  9, 2023
"""
from line_profiler import profile
from fundamentals import tools
from builtins import object
import sys
import os
from soxspipe.commonutils import uncompress
from soxspipe.commonutils.toolkit import get_calibrations_path

os.environ["TERM"] = "vt100"


[docs] class data_organiser(object): """ *The `soxspipe` Data Organiser* **Key Arguments:** - ``log`` -- logger - ``rootDir`` -- the root directory of the data to process - ``vlt`` -- prepare the workspace using the standard vlt /data directory **Usage:** To setup your logger, settings and database connections, please use the ``fundamentals`` package (see tutorial here https://fundamentals.readthedocs.io/en/master/initialisation.html). To initiate a data_organiser object, use the following: ```python from soxspipe.commonutils import data_organiser do = data_organiser( log=log, rootDir="/path/to/workspace/root/" ) do.prepare() ``` """ def __init__(self, log, rootDir, vlt=False, dbConnect=True): from os.path import expanduser import codecs from fundamentals.logs import emptyLogger import warnings from astropy.utils.exceptions import AstropyWarning import sqlite3 as sql warnings.simplefilter("ignore", AstropyWarning) self.vlt = vlt self.PAE = False log.debug("instantiating a new 'data_organiser' object") self.log = log # MAKE RELATIVE HOME PATH ABSOLUTE if rootDir[0] == "~": home = expanduser("~") directory = directory.replace("~", home) self.rootDir = rootDir self.rawDir = rootDir + "/raw" self.miscDir = rootDir + "/misc" self.sessionsDir = rootDir + "/sessions" if self.vlt: self.vltReduced = self.use_vlt_environment_folders() # SESSION ID PLACEHOLDER FILE self.sessionIdFile = self.sessionsDir + "/.sessionid" exists = os.path.exists(self.sessionIdFile) if exists: with codecs.open(self.sessionIdFile, encoding="utf-8", mode="r") as readFile: sessionId = readFile.read() self.sessionPath = self.sessionsDir + "/" + sessionId self.sessionId = sessionId # DATABASE FILE self.rootDbPath = rootDir + "/soxspipe.db" # RETURN HERE: add these to yaml file # A LIST OF FITS HEADER KEYWORDS LOOKUP KEYS. THESE KEYWORDS WILL BE LIFTED FROM ALL FITS FILES self.keyword_lookups = [ "MJDOBS", "DATE_OBS", "SEQ_ARM", "DPR_CATG", "DPR_TECH", "DPR_TYPE", "PRO_CATG", "PRO_TECH", "PRO_TYPE", "EXPTIME", "WIN_BINX", "WIN_BINY", "DET_READ_SPEED", "SLIT_UVB", "SLIT_VIS", "SLIT_NIR", "LAMP1", "LAMP2", "LAMP3", "LAMP4", "LAMP5", "LAMP6", "LAMP7", "DET_READ_TYPE", "CONAD", "GAIN", "RON", "OBS_ID", "OBS_NAME", "NAXIS", "OBJECT", "TPL_ID", "INSTRUME", "ABSROT", "EXPTIME2", "TPL_NAME", "TPL_NEXP", "TPL_EXPNO", "ACFW_ID", "RA", "DEC", "DET", "SWSIM_NIR", "SWSIM_VIS", "SWSIM_UVB", "PAR_ANG_END", "PAR_ANG_START", "AZ_ANG", "ALT_ANG", "SEEING_END", "SEEING_START", "AIRMASS_END", "AIRMASS_START", "TARG_NAME", "OB_TPL_NO", "OB_NTPL", "OB_START", "TPL_START", "NIR_TEMP_K", "VIS_TEMP_C", "CP_TEMP_C", "AFC1_POS1", "AFC1_POS2", "AFC2_POS1", "AFC2_POS2", ] # THE MINIMUM SET OF KEYWORD WE EVER WANT RETURNED # USED IN GROUPING RAW FRAMES # RETURN HERE: add these to yaml file self.rawFrameGroupKeywords = [ "file", "eso seq arm", "eso dpr catg", "eso dpr type", "eso dpr tech", "eso pro catg", "eso pro tech", "eso pro type", "eso obs id", "eso obs name", "exptime", "binning", "rospeed", "slit", "slitmask", "lamp", "gain", "night start date", "night start mjd", "mjd-date", "mjd-obs", "date-obs", "object", "template", "instrume", "absrot", "eso tpl name", "eso tpl nexp", "eso tpl expno", "filter", "ra", "dec", "simulation", "filepath", "eso tel parang end", "eso tel parang start", "eso tel az", "eso tel alt", "eso tel ambi fwhm end", "eso tel ambi fwhm start", "eso tel airm end", "eso tel airm start", "eso obs targ name", "eso obs tplno", "eso obs ntpl", "eso tpl start", "eso obs start", "nir temp k", "vis temp c", "cp temp c", "afc1 pos1", "afc1 pos2", "afc2 pos1", "afc2 pos2", ] self.proKeywords = ["eso pro type", "eso pro tech", "eso pro catg"] # THESE ARE KEYS WE NEED TO FILTER ON, AND SO NEED TO CREATE ASTROPY TABLE INDEXES self.filterKeywords = [ "eso seq arm", "eso dpr catg", "eso dpr tech", "eso dpr type", "eso pro catg", "eso pro tech", "eso pro type", "exptime", "rospeed", "slit", "slitmask", "gain", "binning", "night start mjd", "night start date", "instrume", "lamp", "template", "eso obs name", "eso obs id", "eso tpl name", "eso tpl nexp", "filter", "object", ] self.filterKeywordsExtras = [ "mjd-obs", "nir temp k", "vis temp c", "cp temp c", "afc1 pos1", "afc1 pos2", "afc2 pos1", "afc2 pos2", ] self.productFilterKeywords = [ "eso seq arm", "eso pro catg", "eso pro tech", "eso pro type", ] # THIS IS THE ORDER TO PROCESS THE FRAME TYPES self.reductionOrder = [ "BIAS", "DARK", "LAMP,FMTCHK", "LAMP,ORDERDEF", "LAMP,DORDERDEF", "LAMP,QORDERDEF", "LAMP,FLAT", "FLAT,LAMP", "DOME,FLAT", "LAMP,DFLAT", "LAMP,QFLAT", "WAVE,LAMP", "LAMP,WAVE", "STD,FLUX", "STD", "STD,TELLURIC", "OBJECT", "OBJECT,ASYNC", ] # THIS IS THE ORDER THE RECIPES NEED TO BE RUN IN (MAKE SURE THE REDUCTION SCRIPT HAS RECIPES IN THE CORRECT ORDER) self.recipeOrder = [ "mbias", "mdark", "disp_sol", "order_centres", "mflat", "spat_sol", "stare", "nod", "offset", ] # DECOMPRESS .Z FILES from soxspipe.commonutils import uncompress uncompress(log=self.log, directory=self.rootDir) exists = os.path.exists(self.rootDbPath) if exists and dbConnect: self.conn, reset = self._get_or_create_db_connection() else: self.conn = None return None
[docs] def prepare(self, refresh=False, report=True): """*Prepare the workspace for data reduction by generating all SOF files and reduction scripts.* **Key Arguments:** - ``refresh`` -- trigger a complete refresh the workspace during preparation (delete database and do a complete prepare) """ self.log.debug("starting the ``prepare`` method") import codecs import glob if refresh: # DELETE THE SQLITE DATABASE IF IT EXISTS exists = os.path.exists(self.rootDbPath) self.conn = None if exists: os.remove(self.rootDbPath) print("The existing database has been removed to allow a complete refresh of the workspace.") try: os.remove(self.rootDbPath + "-shm") except: pass try: os.remove(self.rootDbPath + "-wal") except: pass # DELETE ALL ERROR LOG AND SOF FILES if False: for root, dirs, files in os.walk(os.path.abspath(self.rootDir)): for file in files: if file.endswith("ERROR.log") or file.endswith(".sof"): os.remove(os.path.join(root, file)) self._select_instrument() # TEST FITS FILES OR raw_frames DIRECT EXISTS fitsExist = self._fits_files_exist() # EXIST IF NO FITS FILES EXIST - SOME PROTECTION AGAINST MOVING USER FILES IF THEY MAKE A MISTAKE PREPARE A WORKSPACE IN THE WRONG LOCATION if fitsExist == False: print("There are no FITS files in this directory. Please add your data before running `soxspipe prep`") sys.exit() return None # MK RAW FRAME DIRECTORY if not os.path.exists(self.rawDir): os.makedirs(self.rawDir) # TEST FOR SQLITE DATABASE - ADD IF MISSING self.conn, reset = self._get_or_create_db_connection() # MK SESSION DIRECTORY if not os.path.exists(self.sessionsDir): os.makedirs(self.sessionsDir) basename = os.path.basename(self.rootDir) print(f"PREPARING THE `{basename}` WORKSPACE FOR DATA-REDUCTION") self._sync_raw_frames() self._move_misc_files() # IF SESSION ID FILE DOES NOT EXIST, CREATE A NEW SESSION # OTHERWISE USE CURRENT SESSION exists = os.path.exists(self.sessionIdFile) if not exists: sessionId = self.session_create(sessionId="base") self.sessionId = sessionId else: with codecs.open(self.sessionIdFile, encoding="utf-8", mode="r") as readFile: sessionId = readFile.read() self.sessionPath = self.sessionsDir + "/" + sessionId # GET SETTINGS settingsPath = self.sessionPath + "/soxspipe.yaml" su = tools( arguments={"<workspaceDirectory>": self.sessionPath, "settingsFile": settingsPath}, docString=False, logLevel="WARNING", options_first=False, projectName="soxspipe", createLogger=False, ) arguments, self.settings, replacedLog, dbConn = su.setup() if True: c = self.conn.cursor() sqlQueries = [ f'update product_frames set status_{self.sessionId} = "pass" where status_{self.sessionId} = "fail" and sof in (select sof_name from quality_control);', f"update quality_control set qc_value_min = null, qc_value_max = null, qc_flag = 'pass';", ] for k, v in self.settings.items(): if k[:5] == "soxs-": recipe = k for a in ["acq", "vis", "nir"]: if a in v and "qc-acceptable-ranges" in v[a]: arm = a.upper() for kk, vv in v[a]["qc-acceptable-ranges"].items(): qc_name = kk.upper().replace("-", " ") qc_min = vv[0] qc_max = vv[1] sqlQueries.append( f'update quality_control set qc_value_min = {qc_min}, qc_value_max = {qc_max} where soxspipe_recipe = "{recipe}" and sof_name like "%{arm}%" and qc_name = "{qc_name}" and qc_order = "-1";' ) if "qc-acceptable-ranges" in v: for kk, vv in v["qc-acceptable-ranges"].items(): qc_name = kk.upper().replace("-", " ") qc_min = vv[0] qc_max = vv[1] sqlQueries.append( f'update quality_control set qc_value_min = {qc_min}, qc_value_max = {qc_max} where soxspipe_recipe = "{recipe}" and qc_name = "{qc_name}" and qc_order = "-1";' ) sqlQueries += [ 'update quality_control set qc_flag = "pass" where CAST(qc_value as float) < qc_value_max and CAST(qc_value as float) > qc_value_min and qc_flag != "pass" and qc_order = "-1";', 'update quality_control set qc_flag = "fail" where (CAST(qc_value as float) > qc_value_max or CAST(qc_value as float) < qc_value_min) and qc_flag != "fail" and qc_order = "-1";', 'update product_frames set status_base = "fail" where sof in (select sof_name from quality_control where qc_flag = "fail");', ] for sqlQuery in sqlQueries: c.execute(sqlQuery) self.conn.commit() c.close() self._flag_files_to_ignore() self.build_sof_files() self.build_sof_files() if report: rawDirStr = self.rawDir.replace("./", "") print(f"\nTHE `{basename}` WORKSPACE FOR HAS BEEN PREPARED FOR DATA-REDUCTION\n") print(f"In this workspace you will find:\n") print(f" - `misc/`: a lost-and-found archive of non-fits files") print(f" - `qc/`: nested folders, ordered by date, containing quality-control plots and tables.") print(f" - `{rawDirStr}/`: nested folders, ordered by date, containing raw-frames.") print(f" - `sessions/`: directory of data-reduction sessions") print(f" - `sof/`: the set-of-files (sof) files required for each reduction step") print(f" - `soxspipe.db`: a sqlite database needed by the data-organiser, please do not delete") print(f" - `reduced/`: nested folders, ordered by date, containing reduced data.\n") incompleteSets = self.get_incomplete_raw_frames_set() if len(incompleteSets.index): from tabulate import tabulate print("SOME CALIBRATION FRAMES ARE NOT PRESENT FOR THE FOLLOWING DATA SETS AND THEY CANNOT BE REDUCED:") print(tabulate(incompleteSets, headers="keys", tablefmt="psql", showindex=False)) self.conn.close() self.log.debug("completed the ``prepare`` method") return None
[docs] def list_obs(self): """*list all observation names and IDs in the current workspace*""" import pandas as pd self.log.debug("starting the ``list_obs`` method") import pandas as pd query = 'select distinct "eso seq arm", "night start date", "night start mjd", "eso obs name","eso obs id" from raw_frame_sets where complete = 1 and recipe in ("nod_obj","stare_obj","offset_obj") and `eso dpr type` like "%OBJECT%" order by "mjd-obs" asc' obsDf = pd.read_sql(query, con=self.conn) if len(obsDf.index): from tabulate import tabulate print(f"THE CURRENT WORKSPACE CONTAINS {len(obsDf.index)} SCIENCE OBSERVATION BLOCKS:") print(tabulate(obsDf, headers="keys", tablefmt="psql", showindex=False)) self.log.debug("completed the ``list_obs`` method") return obsDf
[docs] def list_sofs(self): """*list all science object SOF files in the current workspace*""" import pandas as pd self.log.debug("starting the ``list_sofs`` method") query = 'select distinct "eso seq arm", "night start date", round("mjd-obs",1) as "mjd-obs", "eso obs name","eso obs id", "slit", "sof" from raw_frame_sets where complete = 1 and recipe in ("nod_obj","stare_obj","offset_obj") and `eso dpr type` like "%OBJECT%" order by "mjd-obs" asc' sofDf = pd.read_sql(query, con=self.conn) if len(sofDf.index): from tabulate import tabulate print(f"THE CURRENT WORKSPACE CONTAINS {len(sofDf.index)} SCIENCE SOF FILES:") print(tabulate(sofDf, headers="keys", tablefmt="psql", showindex=False)) self.log.debug("completed the ``list_sofs`` method") return sofDf
[docs] def list_raw(self, sofFile): """*list the all the raw frames associated with a given science object SOF file*""" import pandas as pd self.log.debug("starting the ``list_raw`` method") sqlQuery = f"select sof from product_frames where sof = '{sofFile}' and complete = 1" for _ in range(4): # Recursively query up to 5 times sqlQuery = f"SELECT distinct sof FROM product_frames WHERE file IN (SELECT file FROM sof_map_base WHERE sof in ({sqlQuery})) or sof in ({sqlQuery})" sqlQuery = f"SELECT * from sof_map WHERE sof in ({sqlQuery}) and filepath like '%./raw/%' order by sof" table = pd.read_sql(sqlQuery, con=self.conn) filepaths = table["filepath"].tolist() self.log.debug("completed the ``list_raw`` method") return list(set(filepaths)), table
def _sync_raw_frames(self, skipSqlSync=False): """*sync the raw frames between the project folder and the database* **Key Arguments:** - ``skipSqlSync`` -- skip the SQL db sync (used only in secondary clean-up scan) **Return:** - None **Usage:** ```python from soxspipe.commonutils import data_organiser do = data_organiser( log=log, rootDir="/path/to/root/folder/" ) do._sync_raw_frames() ``` """ self.log.debug("starting the ``_sync_raw_frames`` method") import shutil import pandas as pd import re remainingFiles = 1 firstPass = True while remainingFiles > 0: # GENERATE AN ASTROPY TABLE OF FITS FRAMES WITH ALL INDEXES NEEDED rawFrames, fitsPaths, remainingFiles = self._create_directory_table( pathToDirectory=self.rootDir, filterKeys=self.filterKeywords ) if not remainingFiles: remainingFiles = 0 elif remainingFiles > 0: if firstPass: firstPass = False else: sys.stdout.flush() sys.stdout.write("\x1b[1A\x1b[2K") print(remainingFiles, "FITS files remaining to be indexed") if fitsPaths: conn = self.conn knownRawFrames = pd.read_sql("SELECT * FROM raw_frames", con=conn) # SPLIT INTO RAW, REDUCED PIXELS, REDUCED TABLES rawFrames = self._populate_raw_frames_extra_columns(rawFrames) # FIND AND REMOVE DUPLICATE FILES if len(rawFrames.index): mask = rawFrames["filepath"].isnull() rawFrames.loc[mask, "filepath"] = ( "./raw/" + rawFrames.loc[mask, "mjd-date"] + "/" + rawFrames.loc[mask, "file"] ) # FIND AND REMOVE DUPLICATE FILES matchedFiles = pd.merge(rawFrames, knownRawFrames, on=["file", "eso dpr tech"], how="inner") if len(matchedFiles.index): for file in matchedFiles["file"]: try: os.remove(file) except: pass # FIND RECORDS IN THE FILE SYSTEM NOT YET IN THE DATABASE rawFrames = rawFrames[ ~rawFrames.set_index(["file", "eso dpr tech"]).index.isin( knownRawFrames.set_index(["file", "eso dpr tech"]).index ) ] # ADD THE NEWLY FOUND FRAMES TO THE DATABASE databaseDeletes = [] if len(rawFrames.index): # NOW MAKE FILEPATHS RELATIVE TO THE rawDir rawFrames.replace(["--", -99.99], None).to_sql( "raw_frames", con=self.conn, index=False, if_exists="append" ) # MOVE THE FILES TO THE CORRECT LOCATION filepaths = rawFrames["filepath"] filenames = rawFrames["file"] for p, n in zip(filepaths, filenames): parentDirectory = os.path.dirname(p) if not os.path.exists(parentDirectory): # Recursively create missing directories os.makedirs(parentDirectory) if os.path.exists(self.rootDir + "/" + n): realSource = os.path.realpath(self.rootDir + "/" + n) realDest = os.path.realpath(p) matchObject = re.match(r".*?(raw\/\d{4}-\d{2}-\d{2}.*)", realSource) if matchObject and realSource != realDest: # FILE NOT WHERE THEY SHOULD BE - DELETE FROM DATABASE databaseDeletes.append(realDest) elif realSource != realDest: shutil.move(realSource, realDest) if os.path.islink(self.rootDir + "/" + n): os.remove(self.rootDir + "/" + n) if len(databaseDeletes): databaseDeletes = (", ").join(databaseDeletes) c = self.conn.cursor() sqlQuery = f'delete from raw_frames where filepath in ("{databaseDeletes}");' c.execute(sqlQuery) c.close() if not skipSqlSync: self._sync_sql_table_to_directory(self.rawDir, "raw_frames", recursive=False) self.log.debug("completed the ``_sync_raw_frames`` method") return None def _create_directory_table(self, pathToDirectory, filterKeys, limit=10000): """*create an astropy table based on the contents of a directory* **Key Arguments:** - `log` -- logger - `pathToDirectory` -- path to the directory containing the FITS frames - `filterKeys` -- these are the keywords we want to filter on later - `limit` -- maximum number of files to process in one go (to avoid memory issues) **Return** - `rawFrames` -- the primary dataframe table listing all FITS files in the directory (including indexes on `filterKeys` columns) - `fitsPaths` -- a simple list of all FITS file paths - `remainingFiles` -- number of remaining files not included in the table due to the limit **Usage:** ```python # GENERATE AN ASTROPY TABLES OF FITS FRAMES WITH ALL INDEXES NEEDED rawFrames, fitsPaths, remainingFiles = _create_directory_table( log=log, pathToDirectory="/my/directory/path", keys=["file","mjd-obs", "exptime","cdelt1", "cdelt2"], filterKeys=["mjd-obs","exptime"] ) ``` """ self.log.debug("starting the ``_create_directory_table`` function") from ccdproc import ImageFileCollection from astropy.time import Time import numpy as np import pandas as pd from soxspipe.commonutils import keyword_lookup # GENERATE A LIST OF FITS FILE PATHS fitsPaths = [] fitsPathsRel = [] fitsNames = [] for entry in os.scandir(pathToDirectory): if ( not entry.name.startswith(".") and entry.is_file() and (os.path.splitext(entry.name)[1] == ".fits" or ".fits.Z" in entry.name) ): # fitsPaths.append(entry.path) if os.path.islink(entry.path): fp = "./" + os.path.relpath(os.path.realpath(entry.path), pathToDirectory) else: fp = os.path.relpath(entry.path, pathToDirectory) fitsPaths.append(fp) fitsNames.append(entry.name) remainingFiles = max(0, len(fitsPaths) - limit) fitsPaths = fitsPaths[:limit] fitsNames = fitsNames[:limit] recursive = False if len(fitsPaths) == 0: return None, None, None # INSTRUMENT CHECK if recursive: allFrames = ImageFileCollection(filenames=fitsPaths[:3], keywords=["instrume"]) else: allFrames = ImageFileCollection(location=pathToDirectory, filenames=fitsNames[:3], keywords=["instrume"]) tmpTable = allFrames.summary tmpTable["instrume"].fill_value = "--" instrument = tmpTable["instrume"].filled() instrument = list(set(instrument)) if "--" in instrument: instrument.remove("--") if len(instrument) == 2 and "SHOOT" in instrument and "XSHOOTER" in instrument: instrument = ["XSH"] if len(instrument) > 1: self.log.error( f"The directory contains data from a mix of instruments. Please only provide data from either SOXS or XSH" ) raise AssertionError else: self.instrument = instrument[0] self._select_instrument(inst=self.instrument) if remainingFiles < 1: print(f"The instrument has been set to '{self.instrument}'") # KEYWORD LOOKUP OBJECT - LOOKUP KEYWORD FROM DICTIONARY IN RESOURCES # FOLDER self.kw = keyword_lookup(log=self.log, instrument=self.instrument).get self.keywords = ["file"] for k in self.keyword_lookups: try: self.keywords.append(self.kw(k).lower()) except Exception as e: self.log.warning(f"Keyword '{k}' not found in lookup table.") # TOP-LEVEL COLLECTION if recursive: allFrames = ImageFileCollection(filenames=fitsPaths, keywords=self.keywords) rawFrames = allFrames.summary else: # Split fitsNames into batches of 100 batch_size = 1000 batches = [fitsPaths[i : i + batch_size] for i in range(0, len(fitsPaths), batch_size)] from fundamentals import fmultiprocess results = fmultiprocess( log=self.log, function=_harvest_fits_headers, inputArray=batches, poolSize=False, timeout=300, pathToDirectory=pathToDirectory, keywords=self.keywords, filterKeys=filterKeys, instrument=self.instrument, kw=self.kw, turnOffMP=False, progressBar=True, ) rawFrames = pd.concat(results) # # FIX BOUNDARY GROUP FILES -- MOVE TO NEXT DAY SO THEY GET COMBINED WITH THE REST OF THEIR GROUP # # E.G. UVB BIASES TAKEN ACROSS THE BOUNDARY BETWEEN 2 NIGHTS # # FIRST FIND END OF NIGHT DATA - AND PUSH TO THE NEXT DAY # mask = rawFrames["boundary"] > 0.96 # filteredDf = rawFrames.loc[mask].copy() # filteredDf["night start mjd"] = filteredDf["night start mjd"] + 1 # mask = filteredDf["eso dpr type"].isin(["LAMP,DFLAT", "LAMP,QFLAT"]) # filteredDf.loc[mask, "eso dpr type"] = "LAMP,FLAT" # # NOW FIND START OF NIGHT DATA # mask = rawFrames["boundary"] < 0.04 # filteredDf2 = rawFrames.loc[mask].copy() # mask = filteredDf2["eso dpr type"].isin(["LAMP,DFLAT", "LAMP,QFLAT"]) # filteredDf2.loc[mask, "eso dpr type"] = "LAMP,FLAT" # # NOW FIND MATCHES BETWEEN 2 DATASETS # theseKeys = [ # "eso seq arm", # "eso dpr catg", # "eso dpr tech", # "eso dpr type", # "eso pro catg", # "eso pro tech", # "eso pro type", # "night start mjd", # ] # matched = pd.merge(filteredDf, filteredDf2, on=theseKeys) # boundaryFiles = np.unique(matched["file_x"].values) # mask = rawFrames["file"].isin(boundaryFiles) # rawFrames.loc[mask, "night start mjd"] += 1 # rawFrames["night start date"] = Time(rawFrames["night start mjd"], format="mjd").to_value("iso", subfmt="date") self.log.debug("completed the ``_create_directory_table`` function") return rawFrames, fitsPaths, remainingFiles def _sync_sql_table_to_directory(self, directory, tableName, recursive=False): """*sync sql table content to files in a directory (add and delete from table as appropriate)* **Key Arguments:** - ``directory`` -- the directory of fits file to inspect. - ``tableName`` -- the sqlite table to sync. - ``recursive`` -- recursively dig into the directory to find FITS files? Default *False*. **Return:** - None **Usage:** ```python do._sync_sql_table_to_directory('/raw/directory/', 'raw_frames', recursive=False) ``` """ self.log.debug("starting the ``_sync_sql_table_to_directory`` method") import sqlite3 as sql import shutil # GENERATE A LIST OF FITS FILE PATHS IN RAW DIR from fundamentals.files import recursive_directory_listing fitsPaths = recursive_directory_listing( log=self.log, baseFolderPath=directory, whatToList="files" # all | files | dirs ) c = self.conn.cursor() sqlQuery = f"select filepath from {tableName};" c.execute(sqlQuery) # MAKE PATHS ABSOLUTE dbFiles = [r[0].replace("//", "/") for r in c.fetchall()] # DELETED FILES filesNotInDB = list(set(fitsPaths) - set(dbFiles)) filesNotInFS = list(set(dbFiles) - set(fitsPaths)) # MAKE PATHS RELATIVE TO rawDir filesNotInFS = [f.replace(self.rawDir + "/", "./raw/").replace("//", "/") for f in filesNotInFS] if len(filesNotInFS): filesNotInFS = ("','").join(filesNotInFS) sqlQuery = f"delete from {tableName} where filepath in ('{filesNotInFS}');" c.execute(sqlQuery) sqlQuery = f"delete from sof_map_{self.sessionId} where sof in (select sof from sof_map_{self.sessionId} where filepath in ('{filesNotInFS}'));" c.execute(sqlQuery) if len(filesNotInDB): for f in filesNotInDB: # GET THE EXTENSION (WITH DOT PREFIX) basename = os.path.basename(f) extension = os.path.splitext(basename)[1] if extension.lower() != ".fits": pass elif self.rootDir in f: exists = os.path.exists(os.path.abspath(self.rootDir) + "/" + basename) if not exists: os.symlink(os.path.realpath(f), os.path.abspath(self.rootDir) + "/" + basename) else: shutil.move(self.rootDir + "/" + f, self.rootDir) self._sync_raw_frames(skipSqlSync=True) c.close() self.log.debug("completed the ``_sync_sql_table_to_directory`` method") return None def _populate_raw_frames_extra_columns(self, filteredFrames, verbose=False): """*populate extra columns for raw frames to later filter on* **Key Arguments:** - ``filteredFrames`` -- the dataframe from which to split frames into categorise. - ``verbose`` -- print results to stdout. **Return:** - ``rawFrames`` -- dataframe of raw frames only **Usage:** ```python rawFrames = self._populate_raw_frames_extra_columns(filteredFrames) ``` """ self.log.debug("starting the ``catagorise_frames`` method") from astropy.table import Table, unique import numpy as np import pandas as pd from tabulate import tabulate # SPLIT INTO RAW, REDUCED PIXELS, REDUCED TABLES rawFrameGroupKeywords = self.rawFrameGroupKeywords[:] filterKeywordsRaw = self.filterKeywords[:] filteredFrames["slit"] = "--" filteredFrames["slitmask"] = "--" filteredFrames["lamp"] = "--" filteredFrames["simulation"] = "--" filteredFrames["gain"] = -99.99 # ADD SLIT FOR SPECTROSCOPIC DATA filteredFrames.loc[(filteredFrames["eso seq arm"] == "NIR"), "slit"] = filteredFrames.loc[ (filteredFrames["eso seq arm"] == "NIR"), self.kw("SLIT_NIR").lower() ] filteredFrames.loc[(filteredFrames["eso seq arm"] == "VIS"), "slit"] = filteredFrames.loc[ (filteredFrames["eso seq arm"] == "VIS"), self.kw("SLIT_VIS").lower() ] filteredFrames.loc[(filteredFrames["eso seq arm"] == "UVB"), "slit"] = filteredFrames.loc[ (filteredFrames["eso seq arm"] == "UVB"), self.kw("SLIT_UVB").lower() ] # CHECK GAIN AND CONAD ARE CORRECTLY POPULATED filteredFrames["gain"] = filteredFrames[self.kw("CONAD").lower()] mask = filteredFrames[self.kw("GAIN").lower()] > filteredFrames[self.kw("CONAD").lower()] filteredFrames.loc[mask, "gain"] = filteredFrames.loc[mask, self.kw("GAIN").lower()] # ADD SIMULATION FLAG FOR SPECTROSCOPIC DATA (AND MORE) if self.instrument.lower() == "soxs": filteredFrames.loc[(filteredFrames["eso seq arm"] == "NIR"), "simulation"] = filteredFrames.loc[ (filteredFrames["eso seq arm"] == "NIR"), self.kw("SWSIM_NIR").lower() ] filteredFrames.loc[(filteredFrames["eso seq arm"] == "VIS"), "simulation"] = filteredFrames.loc[ (filteredFrames["eso seq arm"] == "VIS"), self.kw("SWSIM_VIS").lower() ] filteredFrames.loc[(filteredFrames["eso seq arm"] == "UVB"), "simulation"] = filteredFrames.loc[ (filteredFrames["eso seq arm"] == "UVB"), self.kw("SWSIM_UVB").lower() ] filteredFrames.loc[(filteredFrames["simulation"] == "--"), "simulation"] = 0 filteredFrames.loc[(filteredFrames["simulation"] == -99.99), "simulation"] = 0 filteredFrames.loc[(filteredFrames["simulation"] == "T"), "simulation"] = 1 filteredFrames = filteredFrames.rename( columns={ "eso ins temp217 val": "nir temp k", "eso ins temp104 val": "vis temp c", "eso ins temp301 val": "cp temp c", "eso ins afc1 pos1": "afc1 pos1", "eso ins afc1 pos2": "afc1 pos2", "eso ins afc2 pos1": "afc2 pos1", "eso ins afc2 pos2": "afc2 pos2", } ) else: filteredFrames["simulation"] = 0 filteredFrames["nir temp k"] = 0 filteredFrames["vis temp c"] = 0 filteredFrames["cp temp c"] = 0 filteredFrames["afc1 pos1"] = 0 filteredFrames["afc1 pos2"] = 0 filteredFrames["afc2 pos1"] = 0 filteredFrames["afc2 pos2"] = 0 filteredFrames.loc[ ((filteredFrames["slit"].str.contains("MULT")) & (filteredFrames["slitmask"] == "--")), "slitmask" ] = "MPH" filteredFrames.loc[ ((filteredFrames["slit"].str.contains("PINHOLE")) & (filteredFrames["slitmask"] == "--")), "slitmask" ] = "PH" filteredFrames.loc[ ((filteredFrames["slit"].str.contains("SLIT")) & (filteredFrames["slitmask"] == "--")), "slitmask" ] = "SLIT" lampLong = ["argo", "merc", "neon", "xeno", "qth", "deut", "thar"] lampEle = ["Ar", "Hg", "Ne", "Xe", "QTH", "D", "ThAr"] for i in [1, 2, 3, 4, 5, 6, 7]: lamp = self.kw(f"LAMP{i}").lower() if self.instrument.lower() == "soxs": for l, e in zip(lampLong, lampEle): if l in lamp: lamp = e filteredFrames.loc[ ((filteredFrames[self.kw(f"LAMP{i}").lower()] != -99.99) & (filteredFrames["lamp"] != "--")), "lamp" ] += lamp filteredFrames.loc[ ((filteredFrames[self.kw(f"LAMP{i}").lower()] != -99.99) & (filteredFrames["lamp"] == "--")), "lamp" ] = lamp filteredFrames.loc[ ((filteredFrames[self.kw("DPR_TYPE").lower()] == "DOME,FLAT") & (filteredFrames["lamp"] == "--")), "lamp", ] = "DOME" else: filteredFrames.loc[((filteredFrames[self.kw(f"LAMP{i}").lower()] != -99.99)), "lamp"] = ( filteredFrames.loc[ ((filteredFrames[self.kw(f"LAMP{i}").lower()] != -99.99)), self.kw(f"LAMP{i}").lower() ] ) mask = [] for i in self.proKeywords: rawFrameGroupKeywords.remove(i) filterKeywordsRaw.remove(i) if not len(mask): mask = filteredFrames[i] == "--" else: mask = np.logical_and(mask, (filteredFrames[i] == "--")) filteredFrames["lamp"] = filteredFrames["lamp"].str.replace("_lamp", "") filteredFrames["lamp"] = filteredFrames["lamp"].str.replace("_Lamp", "") rawFrames = filteredFrames.loc[mask] # MATCH OFF FRAMES TO ADD THE MISSING LAMPS mask = rawFrames["eso obs name"] == "Maintenance" rawFrames.loc[mask, "eso obs name"] = rawFrames.loc[mask, "eso obs name"] + rawFrames.loc[mask, "eso dpr type"] if self.instrument.lower() == "soxs": groupBy = "eso obs name" else: groupBy = "template" rawFrames.loc[(rawFrames["lamp"] == "--"), "lamp"] = np.nan rawFrames.loc[(rawFrames["eso seq arm"].str.lower() == "nir"), "lamp"] = rawFrames.loc[ (rawFrames["eso seq arm"].str.lower() == "nir"), "lamp" ].fillna( rawFrames.loc[(rawFrames["eso seq arm"].str.lower() == "nir")].groupby(groupBy)["lamp"].transform("first") ) rawFrames.loc[(rawFrames["lamp"].isnull()), "lamp"] = "--" rawFrames["exptime"] = rawFrames["exptime"].apply(lambda x: round(x, 2)) rawGroups = self._group_raw_frames(rawFrames, filterKeywordsRaw, addFilepaths=False, addStartDate=False) if verbose: print("\n# CONTENT FILE INDEX\n") if verbose and len(rawGroups.index): print("\n## ALL RAW FRAMES\n") print( tabulate( rawFrames[rawFrameGroupKeywords], headers="keys", tablefmt="github", showindex=False, stralign="right", floatfmt=".3f", ) ) self.log.debug("completed the ``catagorise_frames`` method") return rawFrames[rawFrameGroupKeywords].replace(["--"], None) def _move_misc_files(self): """*move extra/miscellaneous files to a misc directory*""" self.log.debug("starting the ``_move_misc_files`` method") import shutil if not os.path.exists(self.miscDir): os.makedirs(self.miscDir) # GENERATE A LIST OF FILE PATHS pathToDirectory = self.rootDir allowlistExtensions = [".db", ".yaml", ".log", ".sh", ".py"] for d in os.listdir(pathToDirectory): filepath = os.path.join(pathToDirectory, d) if os.path.splitext(filepath)[1] in allowlistExtensions: continue if ( os.path.isfile(filepath) and os.path.splitext(filepath)[1] != ".db" and "readme." not in d.lower() and "soxspipe" not in d.lower() ): shutil.move(filepath, self.miscDir + "/" + d) self.log.debug("completed the ``_move_misc_files`` method") return None def _write_sof_files(self): """*Write out all possible SOF files from the sof_map database table* **Key Arguments:** # - **Return:** - None """ self.log.debug("starting the ``_write_sof_files`` method") import pandas as pd from tabulate import tabulate conn, reset = self._get_or_create_db_connection() # RECURSIVELY CREATE MISSING DIRECTORIES self.sofDir = self.sessionPath + "/sof" if not os.path.exists(self.sofDir): os.makedirs(self.sofDir) df = pd.read_sql_query(f"select * from sof_map_{self.sessionId} where complete = 1;", conn) # GROUP RESULTS for name, group in df.groupby("sof"): sofPath = self.sofDir + "/" + name if os.path.exists(sofPath): continue myFile = open(sofPath, "w") content = tabulate(group[["filepath", "tag"]], tablefmt="plain", showindex=False) myFile.write(content) myFile.close() self.log.debug("completed the ``_write_sof_files`` method") return None
[docs] def session_create(self, sessionId=False): """*create a data-reduction session with accompanying settings file and required directories* **Key Arguments:** - ``sessionId`` -- optionally provide a sessionId (A-Z, a-z 0-9 and/or _- allowed, 16 character limit) **Return:** - ``sessionId`` -- the unique ID of the data-reduction session **Usage:** ```python do = data_organiser( log=log, rootDir="/path/to/workspace/root/" ) sessionId = do.session_create(sessionId="my_supernova") ``` """ self.log.debug("starting the ``session_create`` method") import re import shutil import sqlite3 as sql rootDbExists = os.path.exists(self.rootDbPath) if rootDbExists: # CREATE THE DATABASE CONNECTION self.conn, reset = self._get_or_create_db_connection() c = self.conn.cursor() sqlQuery = "select distinct instrume from raw_frames" c.execute(sqlQuery) inst = c.fetchall()[0][0].lower() c.close() # TEST SESSION DIRECTORY EXISTS exists = os.path.exists(self.sessionsDir) if not exists: print("Please prepare your workspace using the `soxspipe prep` command before creating a new session.") sys.exit(0) if sessionId: if len(sessionId) > 16: print("Session ID must be 16 characters long or shorter, consisting of A-Z, a-z, 0-9 and/or _-") matchObjectList = re.findall(r"[^0-9a-zA-Z\-\_]+", sessionId) if matchObjectList: print("Session ID must be 16 characters long or shorter, consisting of A-Z, a-z, 0-9 and/or _-") else: # CREATE SESSION ID FROM TIME STAMP from datetime import datetime, date, time now = datetime.now() sessionId = now.strftime("%Y%m%dt%H%M%S") self.sessionId = sessionId # MAKE THE SESSION DIRECTORY self.sessionPath = self.sessionsDir + "/" + sessionId if not os.path.exists(self.sessionPath): os.makedirs(self.sessionPath) # SETUP SESSION SETTINGS AND LOGGING testPath = self.sessionPath + "/soxspipe.yaml" exists = os.path.exists(testPath) if not exists: if "shoo" in inst: inst = "xsh" su = tools( arguments={"<workspaceDirectory>": self.sessionPath, "init": True, "settingsFile": None}, docString=False, logLevel="WARNING", options_first=False, projectName="soxspipe", defaultSettingsFile=f"{inst}_default_settings.yaml", createLogger=False, ) arguments, settings, replacedLog, dbConn = su.setup() # MAKE ASSET PLACEHOLDERS if self.vlt: dest = self.sessionPath + "/reduced" try: os.symlink(self.vltReduced, dest) except: pass folders = ["sof", "qc", "reduced"] for f in folders: if not os.path.exists(self.sessionPath + f"/{f}"): os.makedirs(self.sessionPath + f"/{f}") # ADD A NEW STATUS COLUMN IN product_frames FOR THIS SESSION import sqlite3 as sql conn, reset = self._get_or_create_db_connection() c = conn.cursor() sqlQuery = f"ALTER TABLE product_frames ADD status_{sessionId} TEXT;" try: c.execute(sqlQuery) except: pass # DUPLICATE TEH SOF_MAP TABLE sqlQuery = "SELECT sql FROM sqlite_master WHERE type='table' AND name='z_sof_map'" c.execute(sqlQuery) sqlQuery = c.fetchall()[0][0] sqlQuery = sqlQuery.replace("z_sof_map", f"sof_map_{sessionId}") try: c.execute(sqlQuery) except: pass sqlQueries = [f"DROP VIEW IF EXISTS sof_map;", f"CREATE VIEW sof_map as select * from sof_map_{sessionId};"] for sqlQuery in sqlQueries: c.execute(sqlQuery) c.close() self._write_sof_files() # _reduce_all.sh HAS BEEN REPLACED WITH THE `SOXSPIPE REDUCE` COMMAND # self._write_reduction_shell_scripts() self._symlink_session_assets_to_workspace_root() # WRITE THE SESSION ID FILE import codecs with codecs.open(self.sessionIdFile, encoding="utf-8", mode="w") as writeFile: writeFile.write(sessionId) message = f"A new data-reduction session has been created with sessionId '{sessionId}'" try: self.log.print(message) except: print(message) self.log.debug("completed the ``session_create`` method") return sessionId
[docs] def session_list(self, silent=False): """*list the sessions available to the user* **Key Arguments:** - ``silent`` -- don't print listings if True **Return:** - ``currentSession`` -- the single ID of the currently used session - ``allSessions`` -- the IDs of the other sessions **Usage:** ```python from soxspipe.commonutils import data_organiser do = data_organiser( log=log, rootDir="." ) currentSession, allSessions = do.session_list() ``` """ self.log.debug("starting the ``session_list`` method") import codecs # IF SESSION ID FILE DOES NOT EXIST, REPORT self.sessionIdFile = self.sessionsDir + "/.sessionid" exists = os.path.exists(self.sessionIdFile) if not exists: if not silent: print("No reduction sessions exist in this workspace yet.") return None, None else: with codecs.open(self.sessionIdFile, encoding="utf-8", mode="r") as readFile: currentSession = readFile.read() # LIST ALL SESSIONS allSessions = [d for d in os.listdir(self.sessionsDir) if os.path.isdir(os.path.join(self.sessionsDir, d))] allSessions.sort() if not silent: for s in allSessions: if s == currentSession.strip(): print(f"\033[0;32m*{s}*\u001b[38;5;15m") else: print(s) self.log.debug("completed the ``session_list`` method") return currentSession, allSessions
[docs] def session_switch(self, sessionId): """*switch to an existing workspace data-reduction session* **Key Arguments:** - ``sessionId`` -- the sessionId to switch to **Usage:** ```python from soxspipe.commonutils import data_organiser do = data_organiser( log=log, rootDir="." ) do.session_switch(mySessionId) ``` """ self.log.debug("starting the ``session_switch`` method") import codecs currentSession, allSessions = self.session_list(silent=True) if sessionId == currentSession: print(f"Session '{sessionId}' is already in use.") return None elif sessionId in allSessions: # WRITE THE SESSION ID FILE with codecs.open(self.sessionIdFile, encoding="utf-8", mode="w") as writeFile: writeFile.write(sessionId) else: print(f"There is no session with the ID '{sessionId}'. List existing sessions with `soxspipe session ls`.") return None self.sessionPath = self.sessionsDir + "/" + sessionId self._symlink_session_assets_to_workspace_root() print(f"Session successfully switched to '{sessionId}'.") self.log.debug("completed the ``session_switch`` method") return None
def _symlink_session_assets_to_workspace_root(self): """*symlink session QC, product, SOF directories, database and scripts to workspace root* **Key Arguments:** # - **Return:** - None """ self.log.debug("starting the ``_symlink_session_assets_to_workspace_root`` method") import shutil import os # SYMLINK FILES AND FOLDERS toLink = ["reduced", "qc", "soxspipe.yaml", "sof", "soxspipe.log"] for l in toLink: dest = self.rootDir + f"/{l}" src = self.sessionPath + f"/{l}" try: os.symlink(src, dest) except: os.unlink(dest) os.symlink(src, dest) # REDUCTION SCRIPTS for d in os.listdir(self.sessionPath): filepath = os.path.join(self.sessionPath, d) if os.path.isfile(filepath) and os.path.splitext(filepath)[1] == ".sh": dest = self.rootDir + f"/{d}" src = filepath try: os.symlink(src, dest) except: os.unlink(dest) os.symlink(src, dest) self.log.debug("completed the ``_symlink_session_assets_to_workspace_root`` method") return None
[docs] def session_refresh(self, silent=False, failure=True): """*refresh a session's SOF files (needed if a recipe fails)* **Usage:** ```python from soxspipe.commonutils import data_organiser do = data_organiser( log=log, rootDir="." ) do.session_refresh() ``` """ self.log.debug("starting the ``session_refresh`` method") import sys import os import pandas as pd if failure is True: self.log.print("\nRefeshing SOF files due to recipe failure\n") elif failure is False: self.log.print("\nRefreshing SOF files, as a previously failed recipe is now passing.\n") import codecs # IF SESSION ID FILE DOES NOT EXIST, REPORT exists = os.path.exists(self.sessionIdFile) if not exists: if not silent: print("No reduction sessions exist in this workspace yet.") return None, None else: with codecs.open(self.sessionIdFile, encoding="utf-8", mode="r") as readFile: sessionId = readFile.read() self.sessionPath = self.sessionsDir + "/" + sessionId self.sessionPath = self.sessionsDir + "/" + sessionId self.sessionId = sessionId self.conn, reset = self._get_or_create_db_connection() # SELECT INSTR self._select_instrument() self.build_sof_files() if failure in [True, False]: sys.stdout.flush() sys.stdout.write("\x1b[1A\x1b[2K") self.log.print("SOF file refresh complete") self.log.debug("completed the ``session_refresh`` method") return reset
[docs] def close(self): """*close the database connection* **Usage:** ```python do.close() ``` """ self.log.debug("starting the ``session_refresh`` method") try: self.conn.close() except: pass self.log.debug("completed the ``session_refresh`` method") return
[docs] def use_vlt_environment_folders(self): """*use vlt environment folders* **Key Arguments:** # - **Return:** - None **Usage:** ```python usage code ``` --- ```eval_rst .. todo:: - add usage info - create a sublime snippet for usage - write a command-line tool for this method - update package tutorial with command-line tool info if needed ``` """ self.log.debug("starting the ``use_vlt_environment_folders`` method") import yaml # COLLECT ADVANCED SETTINGS parentDirectory = os.path.dirname(__file__) advs = parentDirectory + "/advanced_settings.yaml" level = 0 exists = False count = 1 while not exists and len(advs) and count < 10: count += 1 level -= 1 exists = os.path.exists(advs) if not exists: advs = "/".join(parentDirectory.split("/")[:level]) + "/advanced_settings.yaml" if not exists: advs = {} else: with open(advs, "r") as stream: advs = yaml.safe_load(stream) vltRaw = advs["vlt-data-raw"] vltReduced = advs["vlt-data-reduced"] # TEST THE VLT FOLDERS EXIST if not os.path.exists(vltRaw) or not os.path.exists(vltReduced): print( "The VLT data structure does not seem to exist on this machine. Are you sure you need to use the --vlt flag?" ) sys.exit(0) try: os.symlink(vltRaw, self.rawDir) except: os.unlink(self.rawDir) os.symlink(vltRaw, self.rawDir) self.log.debug("completed the ``use_vlt_environment_folders`` method") return vltReduced
def _fits_files_exist(self): """Check if any FITS files exist in rawDir or rootDir.""" fitsExist = False exists = os.path.exists(self.rawDir) if exists: from fundamentals.files import recursive_directory_listing theseFiles = recursive_directory_listing( log=self.log, baseFolderPath=self.rawDir, whatToList="files" # all | files | dirs ) for f in theseFiles: if os.path.splitext(f)[1] == ".fits" or ".fits.gz" in os.path.splitext(f): fitsExist = True break if not fitsExist: for d in os.listdir(self.rootDir): filepath = os.path.join(self.rootDir, d) if os.path.isfile(filepath) and ( os.path.splitext(filepath)[1] == ".fits" or ".fits.gz" in os.path.splitext(filepath) ): fitsExist = True break return fitsExist def _get_or_create_db_connection(self): """Private method to get or create the SQLite database connection, copying the template if missing.""" import shutil import sqlite3 as sql import time reset = False conn = None i = 0 tries = 50 while i < tries: if not conn: try: if self.conn: conn = self.conn except: pass if not conn: try: with open(self.rootDbPath): pass self.freshRun = False except IOError: self.freshRun = True emptyDb = os.path.dirname(os.path.dirname(__file__)) + "/resources/soxspipe.db" shutil.copyfile(emptyDb, self.rootDbPath) conn = sql.connect(self.rootDbPath, timeout=300, autocommit=True, check_same_thread=False) c = conn.cursor() try: c.execute("PRAGMA integrity_check;") c.execute("PRAGMA busy_timeout = 100000") c.execute("PRAGMA synchronous = OFF") this = c.fetchall() i = tries + 1 except Exception as e: # DATABASE IS BROKEN, REPLACE WITH EMPTY ONE i += 1 c.close() conn.close() try: del conn del self.conn except: pass time.sleep(1) if i > tries - 1: self.prepare(refresh=True) if not reset: reset = True conn = sql.connect(self.rootDbPath, timeout=300, autocommit=True, check_same_thread=False) c = conn.cursor() c.execute("PRAGMA busy_timeout = 100000") c.execute("PRAGMA synchronous = OFF") c.close() return conn, reset # METHOD TO RETURN ALL THE RAW FRAME SETS THAT ARE NOT COMPLETE FROM THE DATABASE AS A PANDAS TABLE
[docs] def get_incomplete_raw_frames_set(self): import pandas as pd query = 'select distinct "eso seq arm", round("mjd-obs",1) as "mjd-obs", "eso dpr tech","eso dpr type","slit","eso obs name","eso obs id" from raw_frame_sets where complete = 0 and recipe in ("nod_obj","stare_obj","offset_obj")' return pd.read_sql(query, con=self.conn)
def _select_instrument(self, inst=False): """Select the instrument and set related attributes.""" from soxspipe.commonutils import keyword_lookup import yaml if inst: self.instrument = inst else: try: c = self.conn.cursor() sqlQuery = "select instrume from raw_frames where instrume is not null limit 1" c.execute(sqlQuery) self.instrument = c.fetchall()[0][0] c.close() except: return if "SOXS" not in self.instrument.upper(): self.instrument = "XSH" # SETUP THE KEYWORD LOOKUP FUNCTION self.kw = keyword_lookup(log=self.log, instrument=self.instrument).get # SETUP SOF MAP yamlFilePath = ( os.path.dirname(os.path.dirname(__file__)) + "/resources/" + self.instrument.lower() + "_sof_map.yaml" ) # YAML CONTENT TO DICTIONARY with open(yamlFilePath, "r") as stream: self.sofMapLookup = yaml.safe_load(stream) def _flag_files_to_ignore(self): """*Flag files to ignore based on settings and reduction order*""" import sqlite3 as sql # FLAG FILES TO IGNORE BASED ON REDUCTION ORDER c = self.conn.cursor() theseKeywords = "','".join(self.reductionOrder) sqlQuery = f"update raw_frames set ignore = 1 WHERE `eso dpr type` not in ('{theseKeywords}')" c.execute(sqlQuery) self.conn.commit() # FLAG SIMULATION FILES TO IGNORE sqlQuery = f"update raw_frames set ignore = 1 WHERE `eso dpr type` not like '%OBJECT%' and `eso dpr type` not like '%STD%' and simulation = 1" c.execute(sqlQuery) self.conn.commit() # FLAG DFLATS TO IGNORE IF SPECIFIED IN SETTINGS if "ignore-dflats" in self.settings and self.settings["ignore-dflats"]: sqlQuery = "update raw_frames set ignore = 1 WHERE `eso dpr type` like '%DFLAT%'" c.execute(sqlQuery) self.conn.commit() # FLAG DOME FLATS TO IGNORE IF SPECIFIED IN SETTINGS if "ignore-dome-flats" in self.settings and self.settings["ignore-dome-flats"]: sqlQuery = "update raw_frames set ignore = 1 WHERE `eso dpr type` like '%DOME,FLAT%'" c.execute(sqlQuery) self.conn.commit() sqlQueries = ["update raw_frames set ignore = 1 WHERE `slit` = 'UNDEFINED'"] sqlQueries.append("update raw_frames set ignore = 1 WHERE `eso dpr type` like '%FLAT%' and `slit` = 'BLANK'") sqlQueries.append("update raw_frames set ignore = 1 WHERE `eso seq arm` = 'ACQ'") for sqlQuery in sqlQueries: c.execute(sqlQuery) self.conn.commit() c.close()
[docs] def build_sof_files(self): """*scan the raw frame table to generate the listing of products that are expected to be created and then write out all of the needed SOF files* **Usage:** ```python self.build_sof_files() ``` """ self.log.debug("starting the ``_populate_product_frames_db_table`` method") import pandas as pd c = self.conn.cursor() sqlQuery = f"update product_frames set status = status_{self.sessionId};" c.execute(sqlQuery) sqlQuery = f"update raw_frames set processed = 0 where processed < 0;" c.execute(sqlQuery) # CLEAN UP FAILED FILES # DELETE FROM count = 0 oldCount = -1 while count != oldCount: oldCount = count c = self.conn.cursor() sqlQuery = f"select distinct sof from sof_map where filepath in ( select p.filepath from sof_map s, product_frames p where p.filepath=s.filepath and (p.status = 'fail' or p.complete < 1));" compromisedSofs = pd.read_sql(sqlQuery, con=self.conn)["sof"].tolist() count = len(compromisedSofs) sqlQuery = f"update product_frames set complete = 0 where (status != 'fail' or status is null) and sof in (select distinct sof from sof_map where filepath in ( select p.filepath from sof_map s, product_frames p where p.filepath=s.filepath and (p.status = 'fail' or p.complete < 1)));" c.execute(sqlQuery) sqlQueries = [ f"update raw_frames set processed = 0 where file in (select file from sof_map where sof in (select distinct sof from sof_map where filepath in ( select p.filepath from sof_map s, product_frames p where p.filepath=s.filepath and (p.status = 'fail' or p.complete < 1))));", f"update raw_frame_sets set complete = 0 where sof in (select distinct sof from sof_map where filepath in ( select p.filepath from sof_map s, product_frames p where p.filepath=s.filepath and (p.status = 'fail' or p.complete < 1)));", f"delete from sof_map_{self.sessionId} where sof in ( select s.sof from sof_map s, product_frames p where p.filepath=s.filepath and (p.status = 'fail' or p.complete < 1));", f"update raw_frames set processed = -1 where file in (select distinct s.file from sof_map s, product_frames p where p.sof=s.sof and p.status = 'fail');", "update raw_frames set lamp = null, slit = null, slitmask = null where `eso dpr type` in ('BIAS','DARK');", "update raw_frames set rospeed = null where rospeed = -1;", """WITH s AS ( SELECT uuid, "file", "eso tpl start", "eso tpl expno" AS expno, CASE WHEN LAG("eso tpl expno") OVER (ORDER BY "eso tpl start", "eso tpl expno") IS NULL THEN 1 WHEN "eso tpl expno" <= LAG("eso tpl expno") OVER (ORDER BY "eso tpl start", "eso tpl expno") THEN 1 WHEN "eso tpl name" != LAG("eso tpl name") OVER (ORDER BY "eso tpl start", "eso tpl expno") THEN 1 ELSE 0 END AS new_set FROM raw_frames ), g AS ( SELECT uuid, "file", "eso tpl start", expno, SUM(new_set) OVER (ORDER BY "eso tpl start", expno ROWS UNBOUNDED PRECEDING) AS grp FROM s ), -- Combine labeling and sizing in one pass set_info AS ( SELECT grp, FIRST_VALUE("file") OVER (PARTITION BY grp ORDER BY "eso tpl start", expno) AS set_file, COUNT(*) OVER (PARTITION BY grp) AS set_size FROM g ), -- Create final mapping final_mapping AS ( SELECT DISTINCT g.uuid, si.set_file, si.set_size FROM g JOIN set_info si ON g.grp = si.grp ) UPDATE raw_frames SET set_first_file = fm.set_file, set_size = fm.set_size FROM final_mapping fm WHERE raw_frames.uuid = fm.uuid; """, ] for sqlQuery in sqlQueries: c = self.conn.cursor() c.execute(sqlQuery) c.close() # DELETE COMPROMISED SOF FILES for sof in compromisedSofs: sofPath = self.sessionPath + "/sof/" + sof try: os.remove(sofPath) except: pass # RESET ALL PRODUCTS TO INCOMPLETE c = self.conn.cursor() allRawGroups = [] for recipeOrder, (name, filters) in enumerate(self.sofMapLookup.items()): # READ FILTER VARIABLES ttypes = filters["eso dpr type"] tech = filters["eso dpr tech"] productTypes = filters["products"] or [] recipe = filters["recipe"] if "calibrations" in filters: calibrationTypes = filters["calibrations"] else: calibrationTypes = [] for ttype in ttypes: rawFrames, rawGroups = self.get_raw_frames_and_groups( ttype=ttype, tech=tech, recipe=recipe, recipeOrder=recipeOrder, filterName=name, unprocessedOnly=True, ) # ADD PREDICTED PRODUCT TO PRODUCT TABLE - DETERMINE IF COMPLETE LATER incompleteProducts = self.predict_product_frames(productTypes, rawGroups, recipe) if not incompleteProducts: continue allRawGroups.append(rawGroups) if not len(calibrationTypes): # MBIAS AND MDARK -- ALWAYS COMPLETE (NO PRIOR CALIBRATION REQUIRED) sqlQuery = f"select sof from product_frames where recipe = '{recipe}' and complete = 0;" containerSofs = pd.read_sql(sqlQuery, con=self.conn)["sof"].tolist() self.raw_frames_to_sof_map(rawGroups=rawGroups, containerSofs=containerSofs) sqlQuery = f"update product_frames set complete = 1 where recipe = '{recipe}' and complete = 0;" c.execute(sqlQuery) else: if isinstance(calibrationTypes, dict): for arm, calType in calibrationTypes.items(): if "STD" in ttype: extraType = f"""AND "eso dpr type" = '{ttype}'""" else: extraType = "" exists = " AND ".join( [ f"""EXISTS ( SELECT 1 FROM cal_{ct} WHERE cal_{ct}.sof = p.sof AND (cal_{ct}.upstream_status = 'pass' OR cal_{ct}.upstream_status IS NULL) )""" for ct in calType ] ) sqlQuery = f"""select sof from product_frames_plus as p WHERE complete < 1 AND recipe = '{recipe}' AND "eso seq arm" = '{arm}' {extraType} AND {exists};""" containerSofs = pd.read_sql(sqlQuery, con=self.conn)["sof"].tolist() self.raw_frames_to_sof_map(rawGroups=rawGroups, containerSofs=containerSofs) sqlQuery = f"""UPDATE product_frames as p SET complete = -1 WHERE complete < 1 And sof in ("{'","'.join(containerSofs)}")""" c.execute(sqlQuery) # FOR COMPLETE PRODUCTS, ADD CALIBRATION FILES TO SOF MAP # NEED TO ALSO ADD THE RAW FILES TOO ... ADD RAW FRAMES, SET COMPLETE = 1 WHERE PRODUCT FRAMES COMPLETE = 1 for ct in calType: sqlQuery = f"""select cal_{ct}.file, cal_{ct}.upstream_tag as tag, product_frames.sof, cal_{ct}.filepath, product_frames.complete from product_frames, cal_{ct} where product_frames.complete = -1 and product_frames.sof=cal_{ct}.sof;""" newSof = pd.read_sql(sqlQuery, con=self.conn) if len(newSof): self._dataframe_to_sqlite(newSof, f"sof_map_{self.sessionId}", replace=False) sqlQuery = f"""update product_frames set complete = 1 where complete = -1;""" c.execute(sqlQuery) self.conn.commit() c.close() # CONCAT ALL RAW GROUPS if len(allRawGroups): rawGroups = pd.concat(allRawGroups, ignore_index=True) if len(rawGroups.index): rawGroups = rawGroups.drop(columns=["filepaths"]) self._dataframe_to_sqlite(rawGroups, "raw_frame_sets", replace=False) c = self.conn.cursor() sqlQueries = [ f"UPDATE sof_map_{self.sessionId} SET complete = 1 WHERE complete = -1;", "UPDATE raw_frame_sets SET complete = 1 WHERE sof IN (SELECT r.sof FROM raw_frame_sets r JOIN product_frames p ON p.sof = r.sof WHERE p.complete = 1);", "UPDATE raw_frame_sets SET complete = 0 WHERE sof IN (SELECT r.sof FROM raw_frame_sets r JOIN product_frames p ON p.sof = r.sof WHERE p.complete = 0);", "UPDATE raw_frames SET processed = 1 WHERE processed = 0 AND filepath IN (SELECT filepath FROM sof_map);", "UPDATE product_frames SET set_first_file = (SELECT s.file FROM sof_map s WHERE s.sof = product_frames.sof AND s.file LIKE 'SOXS%' LIMIT 1) WHERE set_first_file IS NULL;", "UPDATE raw_frame_sets SET set_first_file = (SELECT s.file FROM sof_map s WHERE s.sof = raw_frame_sets.sof AND s.file LIKE 'SOXS%' LIMIT 1) WHERE set_first_file IS NULL;", "UPDATE product_frames SET absrot=(SELECT r.absrot FROM raw_frames r WHERE r.file=product_frames.set_first_file);", "UPDATE raw_frame_sets SET absrot=(SELECT r.absrot FROM raw_frames r WHERE r.file=raw_frame_sets.set_first_file);", ] for sqlQuery in sqlQueries: c.execute(sqlQuery) self.conn.commit() c.close() self._write_sof_files() return
[docs] def get_raw_frames_and_groups( self, ttype=None, arm=None, tech=None, recipe=None, recipeOrder=None, filterName=None, unprocessedOnly=False ): """*Process raw frames to group and calculate mean MJD values.* **Key Arguments:** - ``ttype`` -- optional data product `eso dpr type` to filter by - ``arm`` -- optional instrument `eso seq arm` to filter by - ``tech`` -- optional list of `eso dpr tech` to filter by - ``recipe`` -- recipe name to assign to groups - ``recipeOrder`` -- recipe reduction order - ``filterName`` -- optional name to filter the groups - ``unprocessedOnly`` -- if True, only return unprocessed raw frames **Return:** - `rawFrames` -- processed raw frames dataframe - `rawGroups` -- grouped raw frames with calculated MJD values **Usage:** ```python rawFrames, rawGroups = self.get_raw_frames_and_groups() ``` """ import pandas as pd import numpy as np # IF NONE, SET TO EMPTY STRING ttype, arm, tech = ttype or "", arm or "", tech or "" if ttype or arm: where = "where" else: where = "" if ttype: ttype = "and `eso dpr type` = '" + ttype + "'" if arm: arm = "and `eso seq arm` = '" + arm + "'" if tech: # JOIN ITEMS IN TECH LIST TO A COMMA-SEPARATED STRING tech = "and `eso dpr tech` in (" + ",".join(["'" + t + "'" for t in tech]) + ")" if unprocessedOnly: where = where + " and processed = 0" # READ IN RAW FRAMES TABLE conn = self.conn rawFrames = pd.read_sql( f"SELECT * FROM raw_frames_valid {where} {ttype} {arm} {tech} order by `mjd-obs` asc".replace( "where and", "where" ), con=conn, ) rawFrames = rawFrames.astype( { "exptime": float, "gain": float, "ra": float, "dec": float, "eso tel parang end": float, "eso tel parang start": float, "eso tel az": float, "eso tel alt": float, "eso tel ambi fwhm end": float, "eso tel ambi fwhm start": float, "eso tel airm end": float, "eso tel airm start": float, "absrot": float, "nir temp k": float, "vis temp c": float, "cp temp c": float, "afc1 pos1": float, "afc1 pos2": float, "afc2 pos1": float, "afc2 pos2": float, } ) rawFrames.fillna( { "exptime": -99.99, "gain": -99.99, "ra": -99.99, "dec": -99.99, "eso tel parang end": -99.99, "eso tel parang start": -99.99, "eso tel az": -99.99, "eso tel alt": -99.99, "eso tel ambi fwhm end": -99.99, "eso tel ambi fwhm start": -99.99, "eso tel airm end": -99.99, "eso tel airm start": -99.99, "absrot": -99.99, "nir temp k": -99.99, "vis temp c": -99.99, "cp temp c": -99.99, "afc1 pos1": -99.99, "afc1 pos2": -99.99, "afc2 pos1": -99.99, "afc2 pos2": -99.99, }, inplace=True, ) rawFrames.fillna("--", inplace=True) filterKeywordsRaw = self.filterKeywords[:] for i in self.proKeywords: filterKeywordsRaw.remove(i) # HIDE OFF FRAMES FROM GROUPS mask = ( (rawFrames["eso dpr tech"] == "IMAGE") & (rawFrames["eso seq arm"] == "NIR") & (rawFrames["eso dpr type"] != "DARK") ) rawFramesNoOffFrames = rawFrames.loc[~mask] if not len(rawFramesNoOffFrames.index): return pd.DataFrame(), pd.DataFrame() rawGroups = self._group_raw_frames(rawFramesNoOffFrames, filterKeywordsRaw + ["set_first_file"]) # REMOVE GROUPED STARE - NEED TO ADD INDIVIDUAL FRAMES TO GROUP mask = rawGroups["eso dpr tech"].isin(["ECHELLE,SLIT,STARE"]) rawGroups = rawGroups.loc[~mask] # NOW ADD SCIENCE FRAMES AS ONE ENTRY PER EXPOSURE rawScienceFrames = rawFrames.loc[rawFrames["eso dpr tech"].isin(["ECHELLE,SLIT,STARE"])] if len(rawScienceFrames.index): rawScienceFrames = self._group_raw_frames(rawScienceFrames, filterKeywordsRaw + ["mjd-obs"]) # MERGE DATAFRAMES rawGroups = pd.concat([rawGroups, rawScienceFrames], ignore_index=True) # REMOVE GROUPED SINGLE PINHOLE ARCS - NEED TO ADD INDIVIDUAL FRAMES TO GROUP mask = rawGroups["eso dpr tech"].isin(["ECHELLE,PINHOLE", "ECHELLE,MULTI-PINHOLE"]) rawGroups = rawGroups.loc[~mask] # NOW ADD PINHOLE FRAMES AS ONE ENTRY PER EXPOSURE if self.instrument.upper() == "SOXS": rawPinholeFrames = rawFrames.loc[ (rawFrames["eso dpr tech"].isin(["ECHELLE,PINHOLE", "ECHELLE,MULTI-PINHOLE"])) & ( (rawFrames["eso seq arm"] == "NIR") | (~rawFrames["lamp"].isin(["Xe", "Ar", "Hg", "Ne", "ArNeHgXe"])) ) ] else: rawPinholeFrames = rawFrames.loc[ rawFrames["eso dpr tech"].isin(["ECHELLE,PINHOLE", "ECHELLE,MULTI-PINHOLE"]) ] if len(rawPinholeFrames.index): rawPinholeFrames = self._group_raw_frames(rawPinholeFrames, filterKeywordsRaw + ["mjd-obs"]) # MERGE DATAFRAMES rawGroups = pd.concat([rawGroups, rawPinholeFrames], ignore_index=True) rawGroups["recipe"] = recipe if "STD,FLUX" in ttype: recipe = recipe.replace("_std", "") + "_std_flux" if "STD,TELLURIC" in ttype: recipe = recipe.replace("_std", "") + "_std_tell" rawGroups["sof"] = ( rawGroups["date-obs"].astype(str) + "_" + rawGroups["eso seq arm"].astype(str) + "_" + rawGroups["binning"].astype(str) + "_" + rawGroups["rospeed"].astype(str) + "_" + recipe + "_" + rawGroups["lamp"].astype(str) + "_" + rawGroups["slit"].astype(str) + "_" + rawGroups["exptime"].astype(str) + "s" + "_" + rawGroups["instrume"].astype(str) ) rawGroups["sof"] = rawGroups["sof"].str.upper() if "science" in filterName.lower(): rawGroups["sof"] += "_" + rawGroups["object"].astype(str).replace("-", "_") for _ in range(5): rawGroups["sof"] = rawGroups["sof"].str.replace("_--_", "_", regex=False) rawGroups["sof"] = rawGroups["sof"].str.replace(" ", "", regex=False) rawGroups["sof"] = rawGroups["sof"].str.replace(".", "_", regex=False) rawGroups["sof"] = rawGroups["sof"].str.replace("__", "_", regex=False) rawGroups["sof"] += ".sof" rawGroups["sof"] = rawGroups["sof"].str.replace("MBIAS_0_0S_", "MBIAS_") rawGroups["sof"] = rawGroups["sof"].str.replace("DISP_SOLUTION.*?_PINHOLE", "DSOL_PINHOLE", regex=True) rawGroups["sof"] = rawGroups["sof"].str.replace("SPAT_SOLUTION.*?_MULTPIN", "SSOL_MULTPIN", regex=True) rawGroups["sof"] = rawGroups["sof"].str.replace("ORDER_CENTRES", "OLOC", regex=True) if recipe in ["mbias", "mdark"]: rawGroups["complete"] = 1 else: rawGroups["complete"] = 0 rawGroups["recipe_order"] = recipeOrder # FILTER DATA FRAME # FIRST CREATE THE MASK mask = (rawGroups["recipe"].isin(("mbias", "mdark"))) & (rawGroups["counts"] < rawGroups["eso tpl nexp"]) mask = mask | ((rawGroups["recipe"] == "mflat") & (rawGroups["counts"] < 5)) rawGroups = rawGroups.loc[~mask] return rawFrames, rawGroups
def _group_raw_frames(self, rawFrames, filterKeywordsRaw, addFilepaths=True, addStartDate=True): """Group raw frames and return grouped rows with aggregation metadata.""" import pandas as pd # Create aggregation dictionary agg_dict = {col: "mean" for col in self.filterKeywordsExtras if col not in filterKeywordsRaw} agg_dict["file"] = "size" # for counting rows if "set_first_file" in filterKeywordsRaw: filterKeywordsRaw.remove("set_first_file") rawGroups = rawFrames.groupby(filterKeywordsRaw) if addFilepaths: filepaths = [list(group["filepath"].values) for name, group in rawGroups] if addStartDate: startTime = rawGroups.min()["date-obs"].values # Group and aggregate rawGroups = rawFrames.groupby(filterKeywordsRaw).agg(agg_dict).rename(columns={"file": "counts"}).reset_index() rawGroups.style.hide(axis="index") pd.options.mode.chained_assignment = None # Normalise timestamps to compact YYYYMMDDTHHMMSS format for SOF naming. if addFilepaths: rawGroups["filepaths"] = filepaths if addStartDate: startTime = [str(s).split(".")[0].replace("-", "").replace(":", "") for s in startTime] rawGroups["date-obs"] = startTime return rawGroups
[docs] def predict_product_frames(self, productTypes, rawGroups, recipe): """ Process product frames for a given set of product types and raw groups. **Key Arguments:** - `productTypes` -- List of product types to process. - `rawGroups` -- DataFrame containing raw groups. - `recipe` -- Recipe name. **Return:** - `incompleteProducts` -- Number of incomplete products. """ import pandas as pd if not len(rawGroups.index): sqlQuery = f"select count(*) from product_frames where recipe = '{recipe}' and complete< 1;" c = self.conn.cursor() c.execute(sqlQuery) incompleteProducts = c.fetchall()[0][0] c.close() return incompleteProducts for product in productTypes: proKeys = list(product.values())[0] product = list(product.keys())[0] productFrames = rawGroups.copy() productFrames.drop( columns=[ "eso dpr catg", "eso dpr tech", "eso dpr type", "counts", "complete", "date-obs", "filepaths", ], inplace=True, ) productFrames["eso pro type"] = proKeys["eso pro type"] productFrames["eso pro tech"] = proKeys["eso pro tech"] productFrames["eso pro catg"] = proKeys["eso pro catg"] productFrames["eso pro catg"] = ( productFrames["eso pro catg"].astype(str) + "_" + productFrames["eso seq arm"].astype(str).str.upper() ) if product in ["fits image", "fits table"]: productFrames["file"] = productFrames["sof"].str.replace(".sof", f".fits") if "replace" in proKeys: for item in proKeys["replace"]: productFrames["file"] = productFrames["file"].str.replace(item["from"], item["to"]) else: productFrames["file"] = "XXXX" productFrames["filepath"] = ( "./reduced/" + productFrames["night start date"].astype(str) + "/soxs-" + recipe.replace("_", "-") + "/" + productFrames["file"].astype(str) ) self._dataframe_to_sqlite(productFrames, "product_frames") return 1
[docs] def raw_frames_to_sof_map(self, rawGroups, containerSofs): """ Generate the SOF map from raw groups and complete product SOFs. **Key Arguments:** - `rawGroups` -- DataFrame containing raw frame groups. - `containerSofs` -- array of complete product SOFs. **Return:** - `sofMapDF` -- DataFrame containing the generated SOF map. """ import pandas as pd if not len(rawGroups.index): return # BUILD SOF MAP TABLE mask = rawGroups["sof"].isin(containerSofs) sofMapDF = rawGroups.loc[mask] sofMapDF = sofMapDF.explode("filepaths") sofMapDF["file"] = sofMapDF["filepaths"].apply(lambda x: os.path.basename(x) if pd.notnull(x) else x) sofMapDF = sofMapDF.rename(columns={"filepaths": "filepath"}) sofMapDF["tag"] = sofMapDF["eso dpr type"].replace(",", "_") + "_" + sofMapDF["eso seq arm"] sofMapDF = sofMapDF[["file", "tag", "sof", "filepath", "complete"]] sofMapDF["complete"] = 1 self._dataframe_to_sqlite(sofMapDF, f"sof_map_{self.sessionId}", replace=False) # UPDATE RAW FRAMES AS PROCESSED processedRawFiles = sofMapDF["file"].unique().tolist() if len(processedRawFiles): c = self.conn.cursor() placeholders = ",".join(["?"] * len(processedRawFiles)) sqlQuery = f"update raw_frames set processed=1 where file in ({placeholders});" c.execute(sqlQuery, processedRawFiles) self.conn.commit() c.close() return
def _dataframe_to_sqlite(self, dataframe, table_name, replace=False): """ Retry inserting into the database with a maximum of 7 attempts. **Key Arguments:** - `dataframe` -- DataFrame containing rows to insert. - `table_name` -- Name of the database table to insert into. - `replace` -- If True, replace existing entries; otherwise, append. **Raises:** - Exception if the insertion fails after 7 attempts. """ import time if replace: c = self.conn.cursor() sqlQuery = f"delete from {table_name};" try: c.execute(sqlQuery) except: pass c.close() keepTrying = 0 while keepTrying < 7: try: dataframe.replace(["--", -99.99], None).to_sql( table_name, con=self.conn, index=False, if_exists="append" ) keepTrying = 10 except Exception as e: if keepTrying > 5: raise Exception(e) time.sleep(1) keepTrying += 1
def _harvest_fits_headers(batch, log, pathToDirectory, keywords, filterKeys, instrument, kw): from ccdproc import ImageFileCollection import numpy as np from astropy.time import Time, TimeDelta masterTable = ImageFileCollection(filenames=batch, keywords=keywords) masterTable = masterTable.summary # ADD FILLED VALUES FOR MISSING CELLS for fil in keywords: if fil in filterKeys and fil not in ["exptime"]: try: masterTable[fil].fill_value = "--" except: masterTable.replace_column(fil, masterTable[fil].astype(str)) masterTable[fil].fill_value = "--" # elif fil in ["exptime"]: # masterTable[fil].fill_value = "--" else: try: masterTable[fil].fill_value = -99.99 except: masterTable[fil].fill_value = "--" masterTable = masterTable.filled() # FIX ACQ CAM EXPTIME & ARM & FILTER if "SOXS" in instrument.upper(): matches = (masterTable["exptime"] == -99.99) & (masterTable[kw("EXPTIME2").lower()] != -99.99) masterTable["exptime"][matches] = masterTable[kw("EXPTIME2").lower()][matches] matches = (masterTable["eso seq arm"] == "--") & (masterTable[kw("DET").lower()] == "ACQ") masterTable["eso seq arm"][matches] = "ACQ" matches = (masterTable["eso seq arm"] != "ACQ") & (masterTable[kw("ACFW_ID").lower()] != "--") masterTable[kw("ACFW_ID").lower()][matches] = "--" matches = (masterTable["eso seq arm"] == "ACQ") & ( (masterTable["eso dpr type"] == "BIAS") | (masterTable["eso dpr type"] == "DARK") ) masterTable[kw("ACFW_ID").lower()][matches] = "--" # FILTER OUT FRAMES WITH NO MJD matches = ( (masterTable["mjd-obs"] == -99.99) | (masterTable["eso dpr catg"] == "--") | (masterTable["eso dpr tech"] == "--") | (masterTable["eso dpr type"] == "--") | (masterTable["exptime"] == -99.99) ) missingMJDFiles = masterTable["file"][matches] if len(missingMJDFiles): print("\nThe following FITS files are missing DPR keywords and will be ignored:\n\n") print(missingMJDFiles) masterTable = masterTable[~matches] # SETUP A NEW COLUMN GIVING THE INT MJD THE CHILEAN NIGHT BEGAN ON # 12:00 NOON IN CHILE IS TYPICALLY AT 16:00 UTC (CHILE = UTC - 4) # SO COUNT CHILEAN OBSERVING NIGHTS AS 15:00 UTC-15:00 UTC (11am-11am) if "mjd-obs" in masterTable.colnames: chile_offset = TimeDelta(4.0 * 60 * 60, format="sec") night_start_offset = TimeDelta(15.0 * 60 * 60, format="sec") mjd_ofset = TimeDelta(12.0 * 60 * 60, format="sec") masterTable["mjd-obs"] = masterTable["mjd-obs"].astype(float) chileTimes = Time(masterTable["mjd-obs"], format="mjd", scale="utc") - chile_offset startNightDate = Time(masterTable["mjd-obs"], format="mjd", scale="utc") - night_start_offset # masterTable["utc-4hrs"] = (masterTable["mjd-obs"] - 2 / 3).astype(int) mjdDate = Time(masterTable["mjd-obs"], format="mjd", scale="utc") - mjd_ofset masterTable["mjd-date"] = mjdDate.strftime("%Y-%m-%d") masterTable["utc-4hrs"] = chileTimes.strftime("%Y-%m-%dt%H:%M:%S") masterTable["night start date"] = startNightDate.strftime("%Y-%m-%d") masterTable["night start mjd"] = startNightDate.mjd.astype(int) masterTable.add_index("night start date") masterTable.add_index("night start mjd") masterTable.add_index("mjd-date") if instrument.upper() != "SOXS": if kw("DET_READ_SPEED").lower() in masterTable.colnames: masterTable["rospeed"] = np.copy(masterTable[kw("DET_READ_SPEED").lower()]) try: masterTable["rospeed"][masterTable["rospeed"] == -99.99] = "--" except: masterTable["rospeed"] = masterTable["rospeed"].astype(str) masterTable["rospeed"][masterTable["rospeed"] == -99.99] = "--" masterTable["rospeed"][masterTable["rospeed"] == "1pt/400k/lg"] = "fast" masterTable["rospeed"][masterTable["rospeed"] == "1pt/400k/lg/AFC"] = "fast" masterTable["rospeed"][masterTable["rospeed"] == "1pt/100k/hg"] = "slow" masterTable["rospeed"][masterTable["rospeed"] == "1pt/100k/hg/AFC"] = "slow" masterTable.add_index("rospeed") else: if kw("DET_READ_SPEED").lower() in masterTable.colnames: masterTable["rospeed"] = np.copy(masterTable[kw("DET_READ_SPEED").lower()]) try: masterTable["rospeed"][masterTable["rospeed"] == -99.99] = -1 except: masterTable["rospeed"] = masterTable["rospeed"].astype(str) masterTable["rospeed"][masterTable["rospeed"] == -99.99] = -1 masterTable.add_index("rospeed") if kw("TPL_ID").lower() in masterTable.colnames: masterTable["template"] = np.copy(masterTable[kw("TPL_ID").lower()]) if kw("ACFW_ID").lower() in masterTable.colnames: masterTable["filter"] = np.copy(masterTable[kw("ACFW_ID").lower()]) if "naxis" in masterTable.colnames: masterTable["table"] = np.copy(masterTable["naxis"]).astype(str) masterTable["table"][masterTable["table"] == "0"] = "T" masterTable["table"][masterTable["table"] != "T"] = "F" if kw("WIN_BINX").lower() in masterTable.colnames: masterTable["binning"] = np.core.defchararray.add( masterTable[kw("WIN_BINX").lower()].astype("int").astype("str"), "x" ) masterTable["binning"] = np.core.defchararray.add( masterTable["binning"], masterTable[kw("WIN_BINY").lower()].astype("int").astype("str") ) masterTable["binning"][masterTable["binning"] == "-99x-99"] = "--" masterTable["binning"][masterTable["binning"] == "1x-99"] = "--" masterTable.add_index("binning") if kw("ABSROT").lower() in masterTable.colnames: masterTable["absrot"] = masterTable[kw("ABSROT").lower()].astype(float) masterTable.add_index("absrot") # ADD INDEXES ON ALL KEYS for k in keywords: try: masterTable.add_index(k) except: pass # SORT IMAGE COLLECTION masterTable.sort( [ "eso pro type", "eso seq arm", "eso dpr catg", "eso dpr tech", "eso dpr type", "eso pro catg", "eso pro tech", "mjd-obs", ] ) rawFrames = masterTable.to_pandas(index=False) # ADD FILEPATHS IF IN ./raw/ FOLDER rawFrames["filepath"] = "--" rawFrames["file"] = ( rawFrames["file"].astype(str).str.replace(r"^.*?(raw/\d{4}-\d{2}-\d{2}.*)$", r"./\1", regex=True) ) mask = rawFrames["file"].str.contains(r"\.\/raw\/\d{4}\-\d{2}\-\d{2}.*$", regex=True, na=False) rawFrames.loc[mask, "filepath"] = rawFrames.loc[mask, "file"] # MAKE FILE NAME ONLY THE BASENAME IF IN ./raw/ FOLDER rawFrames.loc[mask, "file"] = rawFrames.loc[mask, "file"].apply(lambda x: os.path.basename(x)) return rawFrames