Source code for soxspipe.commonutils.reducer

#!/usr/bin/env python
# encoding: utf-8
"""
*reduce all the data in a workspace, or target specific obs and files for reduction*

Author
: David Young

Date Created
: January 17, 2024
"""

# from memory_profiler import profile
from fundamentals import tools
from builtins import object
import sys
import os
import multiprocessing

# multiprocessing.set_start_method("spawn")

os.environ["TERM"] = "vt100"


[docs] class reducer(object): """ *reduce all the data in a workspace, or target specific obs and files for reduction* **Key Arguments:** - ``log`` -- logger - ``workspaceDirectory`` -- path to the root of the workspace - ``reductionTarget`` -- target for reduction: "all", "sof", "ob" (default: "all") - ``settings`` -- the settings dictionary - ``pathToSettings`` -- path to the settings file. - ``quitOnFail`` -- quit the pipeline on any recipe failure - ``overwrite`` -- overwrite existing reductions. Default *False*. - ``daemon`` -- run in daemon mode (no terminal output). Default *False*. - ``verbose`` -- print verbose output to terminal. Default *False*. - ``refreshWorkspace`` -- refresh the workspace before reducing to collect new files. Default *False*. ` **Usage:** ```python from soxspipe.commonutils import reducer collection = reducer( log=log, workspaceDirectory="/path/to/workspace/root/", reductionTarget="all", settings=settings, pathToSettings="/path/to/settings.yaml" ) collection.reduce() ``` """ def __init__( self, log, workspaceDirectory, reductionTarget="all", settings=False, pathToSettings=False, quitOnFail=False, overwrite=False, daemon=False, verbose=False, refreshWorkspace=False, ): self.log = log log.debug("instantiating a new 'reducer' object") self.settings = settings self.workspaceDirectory = workspaceDirectory self.reductionTarget = reductionTarget self.overwrite = overwrite self.pathToSettings = pathToSettings self.quitOnFail = quitOnFail self.daemon = daemon self.verbose = verbose # REQUEST THE WORKSPACE PARAMETERS FROM THE DATA-ORGANISER from soxspipe.commonutils import data_organiser do = data_organiser( log=log, rootDir=workspaceDirectory, dbConnect=False, ) self.sessionId, allSessions = do.session_list(silent=True) do.close() if self.sessionId is None: return None self.recipeList = [ "mbias", "mdark", "disp_solution", "order_centres", "mflat", "spat_solution", "nod_std", "stare_std", "offset_std", "nod_obj", "stare_obj", "offset_obj", ] self.sessionPath = workspaceDirectory + "/sessions/" + self.sessionId self.sessionDB = workspaceDirectory + "/soxspipe.db" if refreshWorkspace: self.log.print("Refreshing the workspace before reduction...") do = data_organiser(log=self.log, rootDir=self.workspaceDirectory) do.prepare(refresh=False, report=False) do.close() return None
[docs] def reduce(self, batch=False, multiprocess=False): """ *reduce the selected data* """ self.log.debug("starting the ``reduce`` method") if self.sessionId is None: print("Please prepare this workspace using `soxspipe prep` before attempting to reduce the data.") return None from fundamentals import times import traceback from soxspipe.commonutils import data_organiser do = data_organiser(log=self.log, rootDir=self.workspaceDirectory) do.session_refresh(failure=None) do.close() if self.reductionTarget != "all": self.recipeList = [False] if not batch: batch = 100000000 batchCount = 0 for rootRecipe in self.recipeList: while True and batchCount < batch: # rawGroups WILL CONTAIN ONE RECIPE COMMAND PER ENTRY rawGroups = self.select_sof_files_to_process( recipe=rootRecipe, reductionTarget=self.reductionTarget, arm=False ) if rawGroups.empty: break if multiprocess: import sqlite3 as sql conn = sql.connect(self.sessionDB, timeout=300, autocommit=True, check_same_thread=False) c = conn.cursor() c.execute("PRAGMA busy_timeout = 100000") c.execute("PRAGMA synchronous = OFF") sofList = rawGroups["sof"].tolist() sofList = [self.sessionPath + "/sof/" + sof for sof in sofList] run_recipe_bulk( log=self.log, recipe=rootRecipe, sofList=sofList, commandList=rawGroups["command"].tolist(), settings=self.settings, overwrite=self.overwrite, workspaceDirectory=self.workspaceDirectory, conn=conn, sessionId=self.sessionId, ) break else: fail = False for index, row in rawGroups.iterrows(): if batchCount >= batch: self.log.print(f"Batch limit of {batch} reached, pausing reductions.") break recipe = row["recipe"].replace("_std", "").replace("_obj", "") sof = row["sof"] startTime = times.get_now_sql_datetime() sof = self.sessionPath + "/sof/" + sof try: run_recipe( self.log, recipe, sof, settings=self.settings, overwrite=self.overwrite, command=row["command"], verbose=self.verbose, ) batchCount += 1 except FileExistsError as e: continue except Exception as e: # ONE FAILURE RESET THE SOF FILES SO FUTURE RECIPES DON'T RELY ON FAILED PRODUCTS self.log.error(f"\n\nRecipe failed with the following error:\n\n{traceback.format_exc()}") self.log.error( f'\nRecipe Command: {row["command"].replace("-obj ", " ").replace("-std ", " ")}\n\n' ) fail = True if self.quitOnFail: sys.exit(0) if self.reductionTarget != "all": self.overwrite = False if not self.daemon: print(f"{'='*70}\n") ## FINISH LOGGING ## endTime = times.get_now_sql_datetime() runningTime = times.calculate_time_difference(startTime, endTime) sys.argv[0] = os.path.basename(sys.argv[0]) self.log.print( f'\nRecipe Command: {row["command"].replace("_obj ", " ").replace("_std ", " ")} ' ) self.log.print(f"Recipe Run Time: {runningTime}\n\n") if not self.daemon: print(f"{'='*70}\n") if fail: do = data_organiser(log=self.log, rootDir=self.workspaceDirectory) reset = do.session_refresh() do.close() if reset: print(f"BACK TO THE START! {rootRecipe}\n\n") break break if self.reductionTarget == "all": do = data_organiser(log=self.log, rootDir=self.workspaceDirectory) incompleteSets = do.get_incomplete_raw_frames_set() do.close() if len(incompleteSets.index): from tabulate import tabulate print( "\nSOME CALIBRATION FRAMES ARE NOT PRESENT (OR FAILED TO BE BUILT) FOR THE FOLLOWING DATA SETS AND THEY CANNOT BE REDUCED:" ) print(tabulate(incompleteSets, headers="keys", tablefmt="psql", showindex=False)) do = data_organiser(log=self.log, rootDir=self.workspaceDirectory) do.session_refresh(failure=None) do.close() self.log.debug("completed the ``reduce`` method") return None
[docs] def select_sof_files_to_process(self, recipe=False, reductionTarget=False, batch=False, arm=False): """*select all of the SOF files still requiring processing* **Key Arguments:** - ``recipe`` -- the name of the recipe to filter by (optional) - ``reductionTarget`` -- target for reduction: "all", "sof", "ob" (default: False) - ``batch`` -- number of SOF files to return (default: False, all) - ``arm`` -- filter by arm (default: False, all) **Return:** - `rawGroups` -- a dataframe of the containing a list of recipes and sof file paths **Usage:** ```python rawGroups = reducer.select_sof_files_to_process() ``` """ self.log.debug("starting the ``select_sof_files_to_process`` method") import pandas as pd import sqlite3 as sql conn = sql.connect(self.sessionDB, timeout=300, autocommit=True, check_same_thread=False) c = conn.cursor() c.execute("PRAGMA busy_timeout = 100000") c.execute("PRAGMA synchronous = OFF") if batch: limitText = f" LIMIT {batch} " else: limitText = "" if arm: armText = f" and `eso seq arm` = '{arm}' " else: armText = "" if reductionTarget == "all": # GET THE GROUPS OF FILES NEEDING REDUCED, ASSIGN THE CORRECT COMMAND TO EXECUTE THE RECIPE if not recipe: recipeText = "is not null" else: recipeText = f"= '{recipe}'" rawGroups = pd.read_sql( f"SELECT * FROM raw_frame_sets where recipe_order is not null and complete = 1 and recipe {recipeText} {armText} order by recipe_order, sof {limitText}", con=conn, ) elif reductionTarget.split(".")[-1].lower() == "sof": sqlQuery = f"select sof from product_frames where sof = '{reductionTarget}' and complete = 1" for _ in range(4): # Recursively query up to 5 times sqlQuery = f"SELECT distinct sof FROM product_frames WHERE file IN (SELECT file FROM sof_map_base WHERE sof in ({sqlQuery})) or sof in ({sqlQuery})" sqlQuery = ( f"SELECT distinct sof, recipe from raw_frame_sets WHERE sof in ({sqlQuery}) order by recipe_order, sof" ) rawGroups = pd.read_sql(sqlQuery, con=conn) if not len(rawGroups.index): if reductionTarget != "all": self.log.warning("The SOF file selected for processing is either missing or incomplete.") else: self.log.info("No SOF files require processing.") conn.close() return pd.DataFrame(columns=["recipe", "sof", "command"]) # FILTER DATA FRAME # FIRST CREATE THE MASK if recipe: mask = rawGroups["recipe"] == recipe rawGroups = rawGroups.loc[mask] rawGroups["command"] = ( "soxspipe " + rawGroups["recipe"] .str.replace("-obj", "") .str.replace("_obj", "") .str.replace("-std", "") .str.replace("_std", "") + " sof/" + rawGroups["sof"] ) if self.pathToSettings: rawGroups["command"] += f" -s {self.pathToSettings}" conn.close() self.log.debug("completed the ``select_sof_files_to_process`` method") return rawGroups[["recipe", "sof", "command"]].drop_duplicates()
[docs] def run_recipe(log, recipe, sof, settings, overwrite, command=False, verbose=False, turnOffMP=False): """*execute a pipeline recipe* **Key Arguments:** - ``recipe`` -- the name of the recipe tp execute - ``sof`` -- path to the sof file containing the files the recipe requires - ``command`` -- the command used to run the recipe - ``settings`` -- the settings dictionary - ``overwrite`` -- overwrite existing reductions. Default *False*. - ``verbose`` -- print verbose output to terminal. Default *False*. - ``turnOffMP`` -- turn off multiprocessing mode. Default *False*. **Usage:** ```python reducer.run_recipe("mbias", "/path/to/sofs/my_bias_files.sof") ``` """ log.debug("starting the ``run_recipe`` method") if recipe == "mbias": from soxspipe.recipes import soxs_mbias soxs_recipe = soxs_mbias( log=log, settings=settings, inputFrames=sof, overwrite=overwrite, command=command, verbose=verbose, turnOffMP=turnOffMP, ) if recipe == "mdark": from soxspipe.recipes import soxs_mdark soxs_recipe = soxs_mdark( log=log, settings=settings, inputFrames=sof, overwrite=overwrite, command=command, verbose=verbose, turnOffMP=turnOffMP, ) if recipe == "disp_solution": from soxspipe.recipes import soxs_disp_solution soxs_recipe = soxs_disp_solution( log=log, settings=settings, inputFrames=sof, overwrite=overwrite, command=command, verbose=verbose, turnOffMP=turnOffMP, ) if recipe == "order_centres": from soxspipe.recipes import soxs_order_centres soxs_recipe = soxs_order_centres( log=log, settings=settings, inputFrames=sof, overwrite=overwrite, command=command, verbose=verbose, turnOffMP=turnOffMP, ) if recipe == "mflat": from soxspipe.recipes import soxs_mflat soxs_recipe = soxs_mflat( log=log, settings=settings, inputFrames=sof, overwrite=overwrite, command=command, verbose=verbose, turnOffMP=turnOffMP, ) if recipe == "spat_solution": from soxspipe.recipes import soxs_spatial_solution soxs_recipe = soxs_spatial_solution( log=log, settings=settings, inputFrames=sof, overwrite=overwrite, command=command, verbose=verbose, turnOffMP=turnOffMP, ) if "stare" in recipe: from soxspipe.recipes import soxs_stare soxs_recipe = soxs_stare( log=log, settings=settings, inputFrames=sof, overwrite=overwrite, command=command, verbose=verbose, turnOffMP=turnOffMP, ) if "nod" in recipe: from soxspipe.recipes import soxs_nod soxs_recipe = soxs_nod( log=log, settings=settings, inputFrames=sof, overwrite=overwrite, command=command, verbose=verbose, turnOffMP=turnOffMP, ) if "offset" in recipe: from soxspipe.recipes import soxs_offset soxs_recipe = soxs_offset( log=log, settings=settings, inputFrames=sof, overwrite=overwrite, command=command, verbose=verbose, turnOffMP=turnOffMP, ) productPath, qcTable = soxs_recipe.produce_product() del soxs_recipe log.debug("completed the ``run_recipe`` method") return productPath, qcTable
[docs] def run_recipe_bulk(log, recipe, sofList, commandList, settings, overwrite, workspaceDirectory, conn, sessionId): """*execute a pipeline recipe in multiprocessing mode* **Key Arguments:** - ``log`` -- logger - ``recipe`` -- the name of the recipe tp execute - ``sofList`` -- a list of paths to the sof files containing the files the recipe requires - ``commandList`` -- a list of the commands used to run the recipe for each - ``settings`` -- the settings dictionary - ``overwrite`` -- overwrite existing reductions. Default *False*. - ``workspaceDirectory`` -- path to the root of the workspace - ``conn`` -- a connection to the workspace database - ``sessionId`` -- the session ID of the workspace """ log.debug("starting the ``run_recipe_bulk`` method") from fundamentals import fmultiprocess from soxspipe.commonutils import data_organiser import pandas as pd import shutil def wrapper(inputDict, log, recipe, settings, overwrite, workspaceDirectory, wrapperTurnOffMP=True): import traceback import os returnDict = { "status": None, "sof": inputDict["sof"], "productPath": None, "qcTable": None, "error_message": None, } try: productPath, qcTable = run_recipe( log=log, recipe=recipe, sof=inputDict["sof"], settings=settings, overwrite=overwrite, command=inputDict["command"], turnOffMP=wrapperTurnOffMP, ) returnDict["status"] = "pass" returnDict["productPath"] = productPath returnDict["qcTable"] = qcTable mask = qcTable["qc_flag"] == "fail" failedQcs = qcTable[mask] if len(failedQcs): returnDict["status"] = "fail" failedQcs = failedQcs["qc_name"].tolist() returnDict["error_message"] = ( f"The following QC values are outside of acceptable limits: {', '.join(failedQcs)}." ) except FileExistsError as e: if "previously failed" in str(e): returnDict["status"] = "previous-fail" elif "product of this recipe" in str(e): returnDict["status"] = "previous-pass" else: returnDict["status"] = "fail" returnDict["error_message"] = e except Exception as e: # ONE FAILURE RESET THE SOF FILES SO FUTURE RECIPES DON'T RELY ON FAILED PRODUCTS log.error(f"\n\nRecipe failed with the following error:\n\n{traceback.format_exc()}") log.error( f'\nRecipe Command: {inputDict["command"].replace("-obj ", " ").replace("_obj ", " ").replace("-std ", " ").replace("_std ", " ")}\n\n' ) returnDict["status"] = "fail" returnDict["error_message"] = e return returnDict inputDicts = [{"sof": sof, "command": command} for sof, command in zip(sofList, commandList)] poolSize = False turnOffMP = False wrapperTurnOffMP = True if "mflat" in recipe: poolSize = 3 print( f"Running {len(inputDicts)} reductions for the {recipe.upper()} recipe in multiprocessing mode with a pool size of {poolSize} to avoid memory issues..." ) log.print(f"Running {len(inputDicts)} reductions for the {recipe.upper()} recipe in multiprocessing mode...") results = fmultiprocess( log=log, function=wrapper, inputArray=inputDicts, poolSize=poolSize, timeout=36000, settings=settings, overwrite=overwrite, recipe=recipe, workspaceDirectory=workspaceDirectory, wrapperTurnOffMP=wrapperTurnOffMP, turnOffMP=turnOffMP, mute=True, progressBar=True, ) passing = [] failing = [] skipped = [] for result in results: sof = os.path.basename(result["sof"]) sofList.append(sof) if result["status"] == "pass": passing.append(sof) elif result["status"] == "fail": failing.append(sof) else: skipped.append(sof) print( f"Number of successful {recipe} reductions: {len(passing)}. Number of failed {recipe} reductions: {len(failing)}. Number of pre-existing {recipe} reductions: {len(skipped)}.\n" ) ## COLLECT TOGETHER THE RESULTS AND UPDATE THE DATABASE passing = [] failing = [] skipped = [] qcTables = [] sofList = [] errorMessages = [] errorSOF = [] for result in results: sof = os.path.basename(result["sof"]) sofList.append(sof) if result["status"] == "pass": passing.append(sof) qcTables.append(result["qcTable"]) elif result["status"] == "previous-pass": passing.append(sof) elif result["status"] == "previous-fail": failing.append(sof) elif result["status"] == "fail": failing.append(sof) errorSOF.append(sof) errorMessages.append(result["error_message"]) if result["qcTable"] is not None: qcTables.append(result["qcTable"]) c = conn.cursor() passingString = "','".join(passing) failingString = "','".join(failing) sqlQuery = f"select count(*) from product_frames where (sof in ('{passingString}') and status_{sessionId} = 'fail') or (sof in ('{failingString}') and (status_{sessionId} != 'fail' or status_{sessionId} is null))" c.execute(sqlQuery) count = c.fetchone()[0] if len(passing) or len(failing): sqlQueries = [ f"update product_frames set status_{sessionId} = 'pass' where sof in ('{passingString}') and (status_{sessionId} != 'pass' or status_{sessionId} is null)", f"update product_frames set status_{sessionId} = 'fail' where sof in ('{failingString}') and (status_{sessionId} != 'fail' or status_{sessionId} is null)", ] for sqlQuery in sqlQueries: c.execute(sqlQuery) if len(errorSOF): error_rows = [(str(error), sof) for sof, error in zip(errorSOF, errorMessages)] c.executemany( "update product_frames set error_message = ? where sof = ?", error_rows, ) c.close() conn.close() if count: do = data_organiser(log=log, rootDir=workspaceDirectory) reset = do.session_refresh(failure=True) do.close() if len(qcTables): qcTables = pd.concat(qcTables, ignore_index=True) do = data_organiser(log=log, rootDir=workspaceDirectory) do._dataframe_to_sqlite(qcTables, "quality_control", replace=False) do.close() exists = os.path.exists(workspaceDirectory + "/tmp/") if exists: shutil.rmtree(workspaceDirectory + "/tmp/") log.debug("completed the ``run_recipe_bulk`` method") return None