Source code for desimodel.inputs.focalplane

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
desimodel.inputs.focalplane
==============================

Utilities for constructing a focalplane model.
"""
import os
import datetime
import re
import csv
import json
import gzip
import glob

import configobj
import numpy as np
from astropy.table import Table, Column

from desiutil.log import get_logger

from . import docdb

from ..io import datadir, findfile, load_focalplane

from .focalplane_utils import (
    compute_theta_phi_range,
    rotate_petals,
    create_nominal,
    device_compare,
    device_printdiff,
    update_exclusions,
    valid_states,
    create_tables,
    load_petal_fiber_map,
    propagate_state,
)


[docs]def devices_from_fiberpos(fp): """Populate focalplane properties from a fiberpos file. This is only used for testing consistency with the previous fiberpos files. It should not be used for work with the real instrument. The focalplane properties are modified in place. Args: fp (dict): The focalplane dictionary. Returns: None """ # Get the mapping AND device info from the fiberpos instead. # We CANNOT use the desimodel "load_fiberpos()" function here, since # it seems to strip out the ETC devices. Manually read the file. fiberpos_file = findfile("focalplane/fiberpos-all.fits") fpos = Table.read(fiberpos_file) # There is no "PETAL_ID" for the fake fiberpos, so we use PETAL for pet, dev, devtyp, blk, fib, xoff, yoff in zip( fpos["PETAL"], fpos["DEVICE"], fpos["DEVICE_TYPE"], fpos["SLITBLOCK"], fpos["BLOCKFIBER"], fpos["X"], fpos["Y"], ): fp[pet][dev]["DEVICE_ID"] = "FAKE" fp[pet][dev]["DEVICE_TYPE"] = devtyp fp[pet][dev]["CABLE"] = 0 fp[pet][dev]["CONDUIT"] = "NA" fp[pet][dev]["FWHM"] = 0.0 fp[pet][dev]["FRD"] = 0.0 fp[pet][dev]["ABS"] = 0.0 fp[pet][dev]["SLITBLOCK"] = blk fp[pet][dev]["BLOCKFIBER"] = fib fp[pet][dev]["OFFSET_X"] = xoff fp[pet][dev]["OFFSET_Y"] = yoff fp[pet][dev]["OFFSET_T"] = 0.0 fp[pet][dev]["OFFSET_P"] = 0.0 fp[pet][dev]["MIN_T"] = 0.0 fp[pet][dev]["MIN_P"] = 0.0 fp[pet][dev]["LENGTH_R1"] = 0.0 fp[pet][dev]["LENGTH_R2"] = 0.0 if (devtyp == "POS") or (devtyp == "ETC"): # This is a positioner. fp[pet][dev]["MAX_T"] = 380.0 fp[pet][dev]["MAX_P"] = 200.0 fp[pet][dev]["LENGTH_R1"] = 3.0 fp[pet][dev]["LENGTH_R2"] = 3.0 return
[docs]def devices_from_files( fp, posdir=None, fillfake=False, fakeoffset=False, fibermaps=None ): """Populate focalplane properties from information in files. This populates the focalplane with device information gathered from the "pos_settings" files in svn and from the "Petal verification" files on DocDB. The focalplane dictionary is modified in place. Args: fp (dict): The focalplane dictionary. posdir (str): Directory containing the many positioner conf files. fillfake (bool): If true, fill missing POS and ETC locations with a fake nominal positioner. fakeoffset (bool): If true, use theta / phi offsets that matched very old versions of fiberassign. fibermaps (list): (optional) Override list of tuples (DocDB number, DocDB version, DocDB csv file) of where to find the petal mapping files. Returns: None """ log = get_logger() fp = load_petal_fiber_map(existing=fp, fibermaps=fibermaps) # Parse all the positioner files. pos = dict() pospat = re.compile(r"unit_(.*).conf") if posdir is not None: for root, dirs, files in os.walk(posdir): for f in files: posmat = pospat.match(f) if posmat is not None: file_dev = posmat.group(1) pfile = os.path.join(root, f) print("parsing {}".format(pfile), flush=True) cnf = configobj.ConfigObj(pfile, unrepr=True) # Is this device used? if ("DEVICE_LOC" not in cnf) or (int(cnf["DEVICE_LOC"]) < 0): continue if "PETAL_ID" not in cnf: continue pet = int(cnf["PETAL_ID"]) if (pet < 0) or (pet not in fp): continue # Check that the positioner ID in the file name matches # the file contents. if file_dev != cnf["POS_ID"]: msg = ( "positioner file {} has device {} in its name " "but contains POS_ID={}".format(f, file_dev, cnf["POS_ID"]) ) raise RuntimeError(msg) # Add properties to dictionary pos[file_dev] = cnf break for devid, props in pos.items(): pet = int(props["PETAL_ID"]) dev = int(props["DEVICE_LOC"]) if dev not in fp[pet]: # This should never happen- all possible device locations # should have been pre-populated before calling this function. msg = "Device location {} on petal ID {} does not exist".format(dev, pet) raise RuntimeError(msg) fp[pet][dev]["DEVICE_ID"] = devid t_min, t_max, p_min, p_max = compute_theta_phi_range( props["PHYSICAL_RANGE_T"], props["PHYSICAL_RANGE_P"] ) fp[pet][dev]["OFFSET_T"] = props["OFFSET_T"] fp[pet][dev]["OFFSET_P"] = props["OFFSET_P"] fp[pet][dev]["LENGTH_R1"] = props["LENGTH_R1"] fp[pet][dev]["LENGTH_R2"] = props["LENGTH_R2"] fp[pet][dev]["MIN_T"] = t_min fp[pet][dev]["MAX_T"] = t_max fp[pet][dev]["MIN_P"] = p_min fp[pet][dev]["MAX_P"] = p_max if fillfake: t_min, t_max, p_min, p_max = compute_theta_phi_range(380.0, 200.0) for petal in list(sorted(fp.keys())): devlist = list(sorted(fp[petal].keys())) for dev in devlist: if (petal not in fp) or (dev not in fp[petal]): print(fp[petal][dev], flush=True) devtyp = fp[petal][dev]["DEVICE_TYPE"] if (devtyp != "POS") and (devtyp != "ETC"): continue if fp[petal][dev]["DEVICE_ID"] == "NONE": fp[petal][dev]["LENGTH_R1"] = 3.0 fp[petal][dev]["LENGTH_R2"] = 3.0 if fakeoffset: fp[petal][dev]["OFFSET_T"] = 0.0 fp[petal][dev]["OFFSET_P"] = 0.0 fp[petal][dev]["MIN_T"] = 0.0 fp[petal][dev]["MAX_T"] = 380.0 fp[petal][dev]["MIN_P"] = 0.0 fp[petal][dev]["MAX_P"] = 200.0 else: fp[petal][dev]["OFFSET_T"] = -170.0 fp[petal][dev]["OFFSET_P"] = -5.0 fp[petal][dev]["MIN_T"] = t_min fp[petal][dev]["MAX_T"] = t_max fp[petal][dev]["MIN_P"] = p_min fp[petal][dev]["MAX_P"] = p_max return
[docs]def create( testdir=None, posdir=None, fibermaps=None, petalloc=None, startvalid=None, fillfake=False, fakeoffset=False, fakefiberpos=False, reset=False, ): """Construct DESI focalplane and state files. This function gathers information from the following sources: - Petal verification files on DocDB - Positioner device configuration files (e.g. from svn). - DESI-0530, to get the mapping from device ID to device type as well as the nominal device X/Y offsets on petal 0 (for fillfake option). - Exclusion configobj files in $DESIMODEL/data/focalplane. Args: testdir (str): Override the output directory for testing. posdir (str): Directory containing the many positioner conf files. If None, simulate identical, nominal positioners. A None value will force fillfake=True. fibermaps (list): Override list of tuples (DocDB number, DocDB version, DocDB csv file) of where to find the petal mapping files. petalloc (dict): Mapping of petal ID to petal location. startvalid (str): The first time when this focalplane model is valid. ISO 8601 format string. fillfake (bool): If True, fill missing device locations with fake positioners with nominal values for use in simulations. fakeoffset (bool): If True, artificially sets the theta / phi angle offsets to zero. This replicates the behavior of legacy fiberassign and should only be used for testing. fakefiberpos (bool): If True, ignore the real fibermaps and load the old fiberpos file to get the mapping. Only useful for testing. reset (bool): If True, ignore all previous focalplane models and start with all positioners "good". Default propagates the state of most recent model, after verifying that the positioners are the same. Returns: None """ log = get_logger() outdir = testdir if outdir is None: outdir = os.path.join(datadir(), "focalplane") if posdir is None: fillfake = True if fakefiberpos and (posdir is not None): raise RuntimeError( "Cannot specify both fake positioners from fiberpos and real" " devices from posdir" ) if startvalid is None: startvalid = datetime.datetime.now(tz=datetime.timezone.utc) else: startvalid = datetime.datetime.strptime(startvalid, "%Y-%m-%dT%H:%M:%S%z") file_date = startvalid.isoformat(timespec="seconds") if (petalloc is None) and (posdir is not None): raise RuntimeError( "If specifying posdir, must also specify the petal locations." ) # The mapping of petal IDs to locations if petalloc is None: petalloc = {x: x for x in range(10)} # Create a focalplane containing all possible petal locations # and devices. fp = create_nominal(petalloc) if fakefiberpos: devices_from_fiberpos(fp) else: devices_from_files( fp, posdir=posdir, fillfake=fillfake, fakeoffset=fakeoffset, fibermaps=fibermaps, ) # Now rotate the X / Y offsets based on the petal location. rotate_petals(fp) # Construct the focaplane and state tables nrows = 0 allpetals = list(sorted(fp.keys())) for petal in allpetals: devlist = list(sorted(fp[petal].keys())) nrows += len(devlist) out_fp, out_state = create_tables(nrows) # Populate the table dev_cols = [ "OFFSET_X", "OFFSET_Y", "OFFSET_T", "OFFSET_P", "LENGTH_R1", "LENGTH_R2", ] st_cols = [ "MIN_T", "MAX_T", "MIN_P", "MAX_P", ] row = 0 for petal in allpetals: devlist = list(sorted(fp[petal].keys())) for dev in devlist: out_fp[row]["PETAL"] = fp[petal][dev]["PETAL"] out_fp[row]["DEVICE"] = dev out_fp[row]["LOCATION"] = fp[petal][dev]["PETAL"] * 1000 + dev out_fp[row]["PETAL_ID"] = petal out_fp[row]["DEVICE_ID"] = fp[petal][dev]["DEVICE_ID"] out_fp[row]["DEVICE_TYPE"] = fp[petal][dev]["DEVICE_TYPE"] out_fp[row]["SLITBLOCK"] = fp[petal][dev]["SLITBLOCK"] out_fp[row]["BLOCKFIBER"] = fp[petal][dev]["BLOCKFIBER"] out_fp[row]["CABLE"] = fp[petal][dev]["CABLE"] out_fp[row]["CONDUIT"] = fp[petal][dev]["CONDUIT"] out_fp[row]["FWHM"] = fp[petal][dev]["FWHM"] out_fp[row]["FRD"] = fp[petal][dev]["FRD"] out_fp[row]["ABS"] = fp[petal][dev]["ABS"] if fp[petal][dev]["SLITBLOCK"] < 0: # This must not be a POS device out_fp[row]["FIBER"] = -1 else: out_fp[row]["FIBER"] = ( fp[petal][dev]["PETAL"] * 500 + fp[petal][dev]["SLITBLOCK"] * 25 + fp[petal][dev]["BLOCKFIBER"] ) if (fp[petal][dev]["DEVICE_ID"] != "NONE") or fillfake: for col in dev_cols: if col in fp[petal][dev]: out_fp[row][col] = fp[petal][dev][col] for col in st_cols: if col in fp[petal][dev]: out_state[row][col] = fp[petal][dev][col] out_state[row]["LOCATION"] = out_fp[row]["LOCATION"] out_state[row]["STATE"] = valid_states["OK"] out_state[row]["EXCLUSION"] = "default" out_state[row]["TIME"] = file_date row += 1 # Now load the file(s) with the exclusion polygons # Add the legacy polygons to the dictionary for reference. # Also add an "unknown" polygon set which includes a large circle for the # theta arm that is the size of the patrol radius. excl = dict() # First the THETA arm. circs = [[[0.0 + 3.0, 0.0], 2.095]] seg = [ [2.095 + 3.0, -0.474], [1.358 + 3.0, -2.5], [-0.229 + 3.0, -2.5], [-1.241 + 3.0, -2.792], [-2.095 + 3.0, -0.356], ] segs = [seg] shp_theta = dict() shp_theta["circles"] = circs shp_theta["segments"] = segs # Now the PHI arm circs = [[[0.0 + 3.0, 0.0], 0.967]] seg_upper = [[-3.0 + 3.0, 0.990], [0.0 + 3.0, 0.990]] seg_lower = [ [-2.944 + 3.0, -1.339], [-2.944 + 3.0, -2.015], [-1.981 + 3.0, -1.757], [-1.844 + 3.0, -0.990], [0.0 + 3.0, -0.990], ] segs = [seg_upper, seg_lower] shp_phi = dict() shp_phi["circles"] = circs shp_phi["segments"] = segs excl["legacy"] = dict() excl["legacy"]["theta"] = shp_theta excl["legacy"]["phi"] = shp_phi excl["unknown"] = dict() excl["unknown"]["theta"] = dict() excl["unknown"]["theta"]["circles"] = [[[0.0, 0.0], 6.0]] excl["unknown"]["theta"]["segments"] = list() excl["unknown"]["phi"] = dict() excl["unknown"]["phi"]["circles"] = list() excl["unknown"]["phi"]["segments"] = list() # Get all available exclusion polygons from the desimodel data directory. fpdir = os.path.join(datadir(), "focalplane") excl_match = os.path.join(fpdir, "exclusions_*.conf") excl_files = glob.glob(excl_match) update_exclusions(excl, excl_files) # Propagate the device state. Unless we have the reset option, we need # to load the current focalplane model and try to use the state from that. # We will choose a time that is one second before the currently selected # time. if not reset: dtime = datetime.timedelta(seconds=1) oldtime = startvalid - dtime oldfp, oldexcl, oldstate, oldtmstr = load_focalplane(oldtime) checkrows = np.where(oldstate["LOCATION"] == 7000)[0] print(oldstate[checkrows]) log.info("Comparing generated focalplane to one from %s", oldtmstr) # Compare the old and new. These are the device properties we care # about when propagating state. In particular if positioner # calibration has modified arm lengths and angle ranges we don't # care. checkcols = [ "PETAL", "DEVICE", "PETAL_ID", "DEVICE_ID", "DEVICE_TYPE", "SLITBLOCK", "BLOCKFIBER", ] diff = device_compare(oldfp, out_fp, checkcols) device_printdiff(diff) if len(diff) > 0: msg = ( "Existing focalplane device properties have changed." " Refusing to propagate the device state. Use the 'reset'" " option to start with a new device state." ) raise RuntimeError(msg) propagate_state(out_state, excl, oldstate, oldexcl) # Ensure that the default polygon has been defined. if "default" not in excl.keys(): raise RuntimeError("No default exclusion polygon found in available files") # Now write out all of this collected information. Also write out an # initial "state" log as a starting point. Note that by having log # files (which contain datestamps) also have a "starting" date, it means # that we don't need a single log for the entire survey. out_fp_file = os.path.join(outdir, "desi-focalplane_{}.ecsv".format(file_date)) out_excl_file = os.path.join(outdir, "desi-exclusion_{}.json.gz".format(file_date)) out_state_file = os.path.join(outdir, "desi-state_{}.ecsv".format(file_date)) out_fp.write(out_fp_file, format="ascii.ecsv", overwrite=True) del out_fp out_state.write(out_state_file, format="ascii.ecsv", overwrite=True) del out_state # Now write out the exclusion polygons. Since these are not tabular, we # write to a JSON file. with gzip.open(out_excl_file, "wt", encoding='utf8') as pf: json.dump(excl, pf, indent=4) return