Source code for soxspipe.cl_utils

#!/usr/bin/env python
# encoding: utf-8
"""
Documentation for soxspipe can be found here: http://soxspipe.readthedocs.org

Usage:
    soxspipe --version
    soxspipe prep [<workspaceDirectory> --vlt --refresh]
    soxspipe [-qwpVm] reduce all [<workspaceDirectory> -b <batchSize> -s <pathToSettingsFile>]
    soxspipe [-qxV] reduce sof <sofFile> [<workspaceDirectory> -s <pathToSettingsFile>]
    soxspipe session ((ls|new|<sessionId>)|new <sessionId>)
    soxspipe list (ob|sof) [<workspaceDirectory> -s <pathToSettingsFile>]
    soxspipe raw sof <sofFile> [<workspaceDirectory> -s <pathToSettingsFile>]
    soxspipe [-Vxd] mdark <inputFrames> [-o <outputDirectory> -s <pathToSettingsFile>]
    soxspipe [-Vxd] mbias <inputFrames> [-o <outputDirectory> -s <pathToSettingsFile>]
    soxspipe [-Vxd] disp_solution <inputFrames> [-o <outputDirectory> -s <pathToSettingsFile> --poly=<ooww>]
    soxspipe [-Vxd] order_centres <inputFrames> [-o <outputDirectory> -s <pathToSettingsFile> --poly=<ooww>]
    soxspipe [-Vxd] mflat <inputFrames> [-o <outputDirectory> -s <pathToSettingsFile>]
    soxspipe [-Vxd] spat_solution <inputFrames> [-o <outputDirectory> -s <pathToSettingsFile> --poly=<oowwss>]
    soxspipe [-Vxd] stare <inputFrames> [-o <outputDirectory> -s <pathToSettingsFile>]
    soxspipe [-Vxd] nod <inputFrames> [-o <outputDirectory> -s <pathToSettingsFile>]
    soxspipe [-Vxd] offset <inputFrames> [-o <outputDirectory> -s <pathToSettingsFile>]
    soxspipe watch (start|stop|status) [-s <pathToSettingsFile>]

Options:
    list ob                                list all observations within the workspace
    list sof                               list all science object SOF files within the workspace
    prep                                   prepare a folder of raw data (workspace) for data reduction
    session ls                             list all available data-reduction sessions in the workspace
    session new [<sessionId>]              start a new data-reduction session, optionally give a name up to 16 characters A-Z, a-z, 0-9 and/or _-
    session <sessionId>                    use an existing data-reduction session (use `session ls` to see all IDs)
    reduce all                             reduce all of the data in a workspace.
    reduce sof                             reduce a single science object SOF file.
    raw sof                                export all the raw frames needed to reduce a science object SOF file to a directory called `exported` in the current working directory.

    mbias                                  the master bias recipe
    mdark                                  the master dark recipe
    mflat                                  the master flat recipe
    disp_solution                          the disp solution recipe
    order_centres                          the order centres recipe
    spat_solution                          the spatial solution recipe
    stare                                  reduce stare mode science frames
    nod                                    reduce nodding mode science frames
    offset                                 reduce offset mode science frames

    start                                   start the watch daemon
    stop                                    stop the watch daemon
    status                                  print the status of the watch daemon

    inputFrames                            path to a directory of frames or a set-of-files file

    -b, --batch                            reduce data in batches of <batchSize> recipes (only when reducing all data)
    -d, --debug                            show debugging plots
    -h, --help                             show this help message
    -m, --multiprocess                     run reductions of recipe in parallel (experimental, use with caution and check your results carefully if using this flag)
    -p, --prep                             prepare a workspace before reducing data
    -q, --quitOnFail                       stop the pipeline if a recipe fails
    -r, --refresh                          trigger a complete refresh the workspace during preparation (delete database and do a complete prepare)
    -s, --settings <pathToSettingsFile>    the settings file
    -v, --version                          show version
    -V, --verbose                          more verbose output
    -w, --watch                            watch the workspace and reduce new raw data as it is added (similar to 'watch' mode but runs in the foreground)
    -x, --overwrite                        more verbose output
    --poly=<ORDERS>                        polynomial degrees (overrides parameters found in setting file). oowwss = order_x,order_y,wavelength_x,wavelength_y,slit_x,slit_y e.g. 345435. od = order,dispersion-axis
    --vlt                                  only use this flag if setting up a workspace on a VLT environment workstation
"""

################# GLOBAL IMPORTS ####################
import time
import os
import sys
import readline
import glob
from docopt import docopt
from fundamentals import tools, times
from subprocess import Popen, PIPE, STDOUT
import pickle

os.environ["TERM"] = "vt100"


[docs] def tab_complete(text, state): return (glob.glob(text + "*") + [None])[state]
[docs] def main(arguments=None): """ *The main function used when `cl_utils.py` is run as a single script from the cl, or when installed as a cl command* """ # DETERMINE CURRENT DATA-REDUCTION SESSION from fundamentals.logs import emptyLogger from soxspipe.commonutils import data_organiser arguments = docopt(__doc__) if arguments["<workspaceDirectory>"]: # CHANGE INTO WORKSPACE DIRECTORY os.chdir(arguments["<workspaceDirectory>"]) arguments["<workspaceDirectory>"] = "." # setup the command-line util settings arguments = None eLog = emptyLogger() do = data_organiser(log=eLog, rootDir=".") currentSession, allSessions = do.session_list(silent=True) # QUICKLY SKIP IF PRODUCT EXIST if len(sys.argv[1:]) == 2 or len(sys.argv[1:]) == 4: if len(sys.argv[2]) > 3 and sys.argv[2].split(".")[-1].lower() == "sof": from soxspipe.commonutils import toolkit productPath, startNightDate = toolkit.predict_product_path(sys.argv[2]) if os.path.exists(productPath): basename = os.path.basename(productPath) print( f"The product of this recipe already exists: `{basename}`. To overwrite this product, rerun the pipeline command with the overwrite flag (-x)." ) sys.exit(0) clCommand = sys.argv[0].split("/")[-1] + " " + " ".join(sys.argv[1:]) if "-s" not in sys.argv and "prep" not in sys.argv and "session" not in sys.argv and currentSession: settingsFile = f"./sessions/{currentSession}/soxspipe.yaml" exists = os.path.exists(settingsFile) sys.argv.append("-s") sys.argv.append(settingsFile) if "prep" in sys.argv or "watch" in sys.argv or ("reduce" in sys.argv and "-s" not in sys.argv): arguments = docopt(__doc__) if "--settings" in arguments.keys(): del arguments["--settings"] su = tools( arguments=arguments, docString=__doc__, logLevel="WARNING", options_first=False, projectName="soxspipe", defaultSettingsFile=False, ) arguments, settings, log, dbConn = su.setup() arguments["workspaceDirectory"] = "." arguments["--settings"] = "./soxspipe.yaml" # SET ASTROPY LOGGING LEVEL try: from astropy import log as astrolog astrolog.setLevel("WARNING") except: pass # tab completion for raw_input readline.set_completer_delims(" \t\n;") readline.parse_and_bind("tab: complete") readline.set_completer(tab_complete) # UNPACK REMAINING CL ARGUMENTS USING `EXEC` TO SETUP THE VARIABLE NAMES # AUTOMATICALLY a = {} for arg, val in list(arguments.items()): if arg[0] == "-": varname = arg.replace("-", "") + "Flag" else: varname = arg.replace("<", "").replace(">", "") a[varname] = val if arg == "--dbConn": dbConn = val a["dbConn"] = val log.debug( "%s = %s" % ( varname, val, ) ) ## START LOGGING ## startTime = times.get_now_sql_datetime() log.debug("--- STARTING TO RUN THE cl_utils.py AT %s" % (startTime,)) # set options interactively if user requests if "interactiveFlag" in a and a["interactiveFlag"]: # load previous settings moduleDirectory = os.path.dirname(__file__) + "/resources" pathToPickleFile = "%(moduleDirectory)s/previousSettings.p" % locals() try: with open(pathToPickleFile): pass previousSettingsExist = True except: previousSettingsExist = False previousSettings = {} if previousSettingsExist: previousSettings = pickle.load(open(pathToPickleFile, "rb")) # x-raw-input # x-boolean-raw-input # x-raw-input-with-default-value-from-previous-settings # save the most recently used requests pickleMeObjects = [] pickleMe = {} theseLocals = locals() for k in pickleMeObjects: pickleMe[k] = theseLocals[k] pickle.dump(pickleMe, open(pathToPickleFile, "wb")) verbose = a["verboseFlag"] try: # PACK UP SOME OF THE CL SWITCHES INTO SETTINGS DICTIONARY if a["outputDirectory"]: settings["workspace-root-dir"] = a["outputDirectory"] command = (" ").join(sys.argv) if a["mbias"]: from soxspipe.recipes import soxs_mbias recipe = soxs_mbias( log=log, settings=settings, inputFrames=a["inputFrames"], verbose=verbose, overwrite=a["overwriteFlag"], command=command, debug=a["debugFlag"], ) mbiasFrame = recipe.produce_product() if a["mdark"]: from soxspipe.recipes import soxs_mdark recipe = soxs_mdark( log=log, settings=settings, inputFrames=a["inputFrames"], verbose=verbose, overwrite=a["overwriteFlag"], command=command, debug=a["debugFlag"], ) mdarkFrame = recipe.produce_product() if a["disp_solution"]: from soxspipe.recipes import soxs_disp_solution disp_map = soxs_disp_solution( log=log, settings=settings, inputFrames=a["inputFrames"], verbose=verbose, overwrite=a["overwriteFlag"], polyOrders=a["polyFlag"], command=command, debug=a["debugFlag"], ).produce_product() if a["order_centres"]: from soxspipe.recipes import soxs_order_centres order_table = soxs_order_centres( log=log, settings=settings, inputFrames=a["inputFrames"], verbose=verbose, overwrite=a["overwriteFlag"], polyOrders=a["polyFlag"], command=command, debug=a["debugFlag"], ).produce_product() if a["spat_solution"]: from soxspipe.recipes import soxs_spatial_solution disp_map = soxs_spatial_solution( log=log, settings=settings, inputFrames=a["inputFrames"], verbose=verbose, overwrite=a["overwriteFlag"], polyOrders=a["polyFlag"], command=command, debug=a["debugFlag"], ).produce_product() if a["mflat"]: from soxspipe.recipes import soxs_mflat recipe = soxs_mflat( log=log, settings=settings, inputFrames=a["inputFrames"], verbose=verbose, overwrite=a["overwriteFlag"], command=command, debug=a["debugFlag"], ) mflatFrame = recipe.produce_product() if a["stare"]: from soxspipe.recipes import soxs_stare recipe = soxs_stare( log=log, settings=settings, inputFrames=a["inputFrames"], verbose=verbose, overwrite=a["overwriteFlag"], command=command, debug=a["debugFlag"], ) reducedStare = recipe.produce_product() if a["nod"]: from soxspipe.recipes import soxs_nod recipe = soxs_nod( log=log, settings=settings, inputFrames=a["inputFrames"], verbose=verbose, overwrite=a["overwriteFlag"], command=command, debug=a["debugFlag"], ) reducedNod = recipe.produce_product() if a["offset"]: from soxspipe.recipes import soxs_offset recipe = soxs_offset( log=log, settings=settings, inputFrames=a["inputFrames"], verbose=verbose, overwrite=a["overwriteFlag"], command=command, debug=a["debugFlag"], ) reducedOffset = recipe.produce_product() if a["prep"]: do = data_organiser(log=log, rootDir=a["workspaceDirectory"], vlt=a["vltFlag"]) do.prepare(refresh=a["refreshFlag"]) if a["session"] and a["ls"]: from soxspipe.commonutils import data_organiser do = data_organiser(log=log, rootDir=".") currentSession, allSessions = do.session_list() if a["session"] and a["sessionId"] and not a["new"]: from soxspipe.commonutils import data_organiser do = data_organiser(log=log, rootDir=".") do.session_switch(a["sessionId"]) if a["session"] and a["new"]: from soxspipe.commonutils import data_organiser do = data_organiser(log=log, rootDir=".") sessionId = do.session_create(sessionId=a["sessionId"]) if a["list"]: from soxspipe.commonutils import data_organiser do = data_organiser(log=log, rootDir=a["workspaceDirectory"]) if a["ob"]: do.list_obs() elif a["sof"]: do.list_sofs() if a["raw"]: # EXPORT THE RAW FRAMES NEEDED TO REDUCE A SOF FILE TO AN `exported` DIRECTORY IN THE WORKSPACE DIRECTORY from soxspipe.commonutils import data_organiser import shutil do = data_organiser(log=log, rootDir=a["workspaceDirectory"]) if a["sof"]: rawFramePaths, rawTable = do.list_raw(a["sofFile"]) from tabulate import tabulate print(tabulate(rawTable[["file", "tag"]], headers="keys", tablefmt="psql", showindex=False)) if len(rawFramePaths) == 0: print( f"No raw frames found for the SOF file `{a['sofFile']}`. Please check that the SOF filename is correct." ) else: exportDir = a["workspaceDirectory"] + "/exported" if not os.path.exists(exportDir): os.makedirs(exportDir) for rawFramePath in rawFramePaths: basename = os.path.basename(rawFramePath) exportPath = exportDir + "/" + basename if not os.path.exists(exportPath): shutil.copy(rawFramePath, exportPath) print( f"Exported {len(rawFramePaths)} raw frames for the SOF file `{a['sofFile']}` to the `exported` directory in the workspace directory." ) return except FileExistsError as e: sys.exit(0) except Exception as e: log.error(f"{e}\n{clCommand}", exc_info=True) if a["reduce"]: exists = os.path.exists(a["workspaceDirectory"] + "/soxspipe.db") if not exists: print( f"Please run the 'soxspipe prep {a['workspaceDirectory']}' command to prepare your workspace command before attempting to reduce data" ) return watch = True if a["all"]: reductionTarget = "all" elif a["sof"]: reductionTarget = a["sofFile"] elif a["ob"]: reductionTarget = a["obid"] while watch: from soxspipe.commonutils import reducer collection = reducer( log=log, workspaceDirectory=a["workspaceDirectory"], reductionTarget=reductionTarget, settings=settings, pathToSettings=arguments["--settings"], quitOnFail=a["quitOnFailFlag"], overwrite=a["overwriteFlag"], verbose=verbose, refreshWorkspace=a["prepFlag"], ) if a["batchSize"]: a["batchSize"] = int(a["batchSize"]) collection.reduce(batch=a["batchSize"], multiprocess=a["multiprocessFlag"]) if a["watchFlag"]: xsec = 15 print(f"\nWaiting for {xsec} seconds before next reduction attempt\n") time.sleep(xsec) from soxspipe.commonutils import data_organiser do = data_organiser(log=log, rootDir=a["workspaceDirectory"]) do.prepare() do.close() del do else: watch = False from fundamentals import daemonise class myDaemon(daemonise): def action(self, **kwargs): import time self.log.info("starting the ``action`` method") pwd = kwargs["pwd"] # settings = kwargs["settings"] # settingsFile = kwargs["settingsFile"] currentSession = None while True: os.chdir(pwd) # if "." == arguments["--settings"][0]: # arguments["--settings"] = pwd + "/" + arguments["--settings"][1:] a["workspaceDirectory"] = pwd if currentSession: thisLog = log else: thisLog = self.log from soxspipe.commonutils import data_organiser do = data_organiser(log=thisLog, rootDir=pwd) do.prepare() if not currentSession: currentSession, allSessions = do.session_list(silent=True) if currentSession: from importlib import reload import logging logging.shutdown() reload(logging) if hasattr(logging, "PRINT"): delattr(logging, "PRINT") if hasattr(logging, "print"): delattr(logging, "print") if hasattr(logging.getLoggerClass(), "print"): delattr(logging.getLoggerClass(), "print") settingsFile = f"{pwd}/sessions/{currentSession}/soxspipe.yaml" arguments["--settings"] = settingsFile su = tools( arguments=arguments, docString=__doc__, logLevel="WARNING", options_first=False, projectName="soxspipe", defaultSettingsFile=False, ) argumentsIgnore, settings, log, dbConn = su.setup() if currentSession: from soxspipe.commonutils import reducer collection = reducer( log=log, workspaceDirectory=pwd, settings=settings, pathToSettings=settingsFile, quitOnFail=a["quitOnFailFlag"], daemon=True, ) collection.reduce() xsec = 15 print(f"\nWaiting for {xsec} seconds before next reduction attempt\n") time.sleep(xsec) self.log.info("completed the ``action`` method") return None # MAKE RELATIVE HOME PATH ABSOLUTE from os.path import expanduser home = expanduser("~") su = tools( arguments=arguments, docString=__doc__, logLevel="WARNING", options_first=False, projectName="soxspipe", defaultSettingsFile=False, ) arguments, settings, log, dbConn = su.setup() d = myDaemon(log=log, name="soxspipe", pwd=os.getcwd()) d.errLog = home + f"/.config/soxspipe/daemon.log" d.rootDir = home + f"/.config/soxspipe/" if a["start"]: d.start() elif a["stop"]: d.stop() elif a["status"]: d.status() # CALL FUNCTIONS/OBJECTS if "dbConn" in locals() and dbConn: dbConn.commit() dbConn.close() ## FINISH LOGGING ## endTime = times.get_now_sql_datetime() runningTime = times.calculate_time_difference(startTime, endTime) sys.argv[0] = os.path.basename(sys.argv[0]) if not a["prep"] and not a["session"] and not a["reduce"] and not a["watch"] and not a["list"]: log.print(f'\nRecipe Command: {(" ").join(sys.argv)}') log.print(f"Recipe Run Time: {runningTime}\n\n") print(f"{'='*70}\n") return
if __name__ == "__main__": main()