Source code for desimodel.io

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
desimodel.io
============

I/O utility functions for files in desimodel.
"""
import os
import re
import warnings
from datetime import datetime, timezone

import yaml
import gzip
import numpy as np
from astropy.io import fits
from astropy.table import Table, Column

from desiutil.log import get_logger

log = get_logger()


_thru = dict()


# ADM raise a custom exception when an environment variable is missing.
[docs]class MissingEnvVar(Exception): pass
[docs]def load_throughput(channel): """Returns specter Throughput object for the given channel 'b', 'r', or 'z'. Parameters ---------- channel : {'b', 'r', 'z'} Spectrograph channel. Returns ------- Throughput A specter throughput object. """ import specter.throughput channel = channel.lower() global _thru if channel not in _thru: thrufile = findfile("throughput/thru-{0}.fits".format(channel)) _thru[channel] = specter.throughput.load_throughput(thrufile) return _thru[channel]
# # # _psf = dict()
[docs]def load_psf(channel): """Returns specter PSF object for the given channel 'b', 'r', or 'z'. Parameters ---------- channel : {'b', 'r', 'z'} Spectrograph channel. Returns ------- PSF A specter PSF object. """ import specter.psf channel = channel.lower() global _psf if channel not in _psf: psffile = findfile("specpsf/psf-{0}.fits".format(channel)) _psf[channel] = specter.psf.load_psf(psffile) return _psf[channel]
# # # _params = None
[docs]def load_desiparams(): """Returns DESI parameter dictionary loaded from ``$DESIMODEL/data/desi.yaml``. Returns ------- :class:`dict` The parameters read from the YAML file. """ global _params if _params is None: desiparamsfile = findfile("desi.yaml") with open(desiparamsfile) as par: _params = yaml.safe_load(par) # - for temporary backwards compability after 'exptime' -> 'exptime_dark' if ("exptime" not in _params) and ("exptime_dark" in _params): _params["exptime"] = _params["exptime_dark"] # - Augment params with wavelength coverage from specpsf files # - wavemin/max = min/max wavelength covered by *any* fiber on the CCD # - wavemin/max_all = min/max wavelength covered by *all* fibers for channel in ["b", "r", "z"]: hdr = fits.getheader(findfile("specpsf/psf-{}.fits".format(channel)), 0) _params["ccd"][channel]["wavemin"] = hdr["WAVEMIN"] _params["ccd"][channel]["wavemax"] = hdr["WAVEMAX"] _params["ccd"][channel]["wavemin_all"] = hdr["WMIN_ALL"] _params["ccd"][channel]["wavemax_all"] = hdr["WMAX_ALL"] return _params
# # # # Added and still needs to be committed and pushed to desihub _gfa = None
[docs]def load_gfa(): """Returns GFA table from ``$DESIMODEL/data/focalplane/gfa.ecsv``. Returns ------- :class:`~astropy.table.Table` The data from the ECSV file. """ global _gfa if _gfa is None: gfaFile = findfile("focalplane/gfa.ecsv") _gfa = Table.read(gfaFile, format="ascii.ecsv") return _gfa
# # # _deviceloc = None
[docs]def load_deviceloc(): """Returns a table from ``$DESIMODEL/data/focalplane/fiberpos-all.fits``. Returns ------- :class:`~astropy.table.Table` The data from the FITS file, with columns converted to uppercase. """ global _deviceloc if _deviceloc is None: fiberposfile = findfile("focalplane/fiberpos-all.fits") _deviceloc = Table.read(fiberposfile) # - Convert to upper case if needed # - Make copy of colnames b/c they are updated during iteration for col in list(_deviceloc.colnames): if col.islower(): _deviceloc.rename_column(col, col.upper()) return _deviceloc
_fiberpos = None
[docs]def load_fiberpos(): """Returns fiberpos table from ``$DESIMODEL/data/focalplane/fiberpos.fits``. Returns ------- :class:`~astropy.table.Table` The data from the FITS file, sorted by ``FIBER``. """ global _fiberpos if _fiberpos is None: fiberposfile = findfile("focalplane/fiberpos.fits") _fiberpos = Table.read(fiberposfile) _fiberpos.sort("FIBER") # - Convert to upper case if needed # - Make copy of colnames b/c they are updated during iteration for col in list(_fiberpos.colnames): if col.islower(): _fiberpos.rename_column(col, col.upper()) # - Temporary backwards compatibility for renamed columns if "POSITIONER" in _fiberpos.colnames: warnings.warn( "old fiberpos.fits with POSITIONER column instead of LOCATION; please update your $DESIMODEL checkout", DeprecationWarning, ) _fiberpos["LOCATION"] = _fiberpos["POSITIONER"] else: _fiberpos["POSITIONER"] = _fiberpos["LOCATION"] if "SPECTROGRAPH" in _fiberpos.colnames: warnings.warn( "old fiberpos.fits with SPECTROGRAPH column instead of SPECTRO; please update your $DESIMODEL checkout", DeprecationWarning, ) _fiberpos["SPECTRO"] = _fiberpos["SPECTROGRAPH"] else: _fiberpos["SPECTROGRAPH"] = _fiberpos["SPECTRO"] return _fiberpos
# # # _tiles = dict()
[docs]def load_tiles(onlydesi=True, extra=False, tilesfile=None, cache=True, programs=None, surveyops=True): """Return DESI tiles structure from ``$DESI_SURVEYOPS/trunk/ops/tiles-main.ecsv``. Parameters ---------- onlydesi : :class:`bool`, optional If ``True``, trim to just the tiles in the DESI footprint. extra : :class:`bool`, optional If ``True``, include extra layers with ``PROGRAM='EXTRA'``. tilesfile : :class:`str`, optional Name of tiles file to load; or None for default. See Notes for details. cache : :class:`bool`, optional If ``False``, force reload of data from tiles file, instead of using cached values. programs : :class:`list` or `str`, optional Pass a list of program names to restrict to only those programs, e.g. ["DARK", "BACKUP"]. surveyops : :class:`bool` If ``True`` then find the relevant path for the $DESI_SURVEYOPS directory rather than the $DESIMODEL directory. Returns ------- :class:`~astropy.io.fits.FITS_rec` The data table portion of the FITS file. Raises ------ :exc:`FileNotFoundError` If the value of `tilesfile` does not exist. Notes ----- Keyword-based environment variable expansion is performed on the `tilesfile` value, so *e.g.*:: tiles = load_tiles(tilesfile='{HOME}/my-tiles.fits') will be expanded with the value of :envvar:`HOME`. If the parameter `tilesfile` is set, this function uses the following search method: 0. Paths corresponding to both $DESI_SURVEYOPS/trunk/ops and $DESI_SURVEYOPS/ops are always both checked, to cover different svn checkout approaches. 1. If the value includes an explicit path, even ``./``, use that file. 2. If the value does *not* include an explicit path, *and* the file name is identical to a file in ``$DESI_SURVEYOPS/trunk/ops/``, use the file in ``$DESI_SURVEYOPS/trunk/ops/`` and issue a warning. 3. If no matching file can be found at all, raise an exception. """ global _tiles if tilesfile is None: # Use the default if surveyops: tilesfile = findfile("tiles-main.ecsv", surveyops=surveyops) else: tilesfile = findfile("footprint/desi-tiles.fits", surveyops=surveyops) else: # If full path isn't included, check local vs $DESI_SURVEYOPS/ops tilepath, filename = os.path.split(tilesfile) if tilepath == "": have_local = os.path.isfile(tilesfile) checkfile = findfile(tilesfile, surveyops=surveyops) have_dmdata = os.path.isfile(checkfile) if have_dmdata: if have_local: msg = ( "$DESI_SURVEYOPS/(trunk)/ops/{0} is shadowed by a local" + " file. Choosing $DESI_SURVEYOPS file." + ' Use tilesfile="./{0}" if you want the local copy' + " instead." ).format(tilesfile) warnings.warn(msg) tilesfile = checkfile if not (have_local or have_dmdata): msg = ( 'File "{}" does not exist locally or in ' + "$DESI_SURVEYOPS/(trunk)/ops/!" ).format(tilesfile) raise FileNotFoundError(msg) # - standarize path location tilesfile = os.path.abspath(tilesfile.format(**os.environ)) log.debug("Loading tiles from %s", tilesfile) # ADM allow reading from either .fits or .ecsv files. # ADM guard against the possibility that the file is zipped. isfits = ".fits" in os.path.basename(tilesfile) if cache and tilesfile in _tiles: tiledata = _tiles[tilesfile] else: if isfits: with fits.open(tilesfile, memmap=False) as hdulist: tiledata = hdulist[1].data # # Temporary workaround for problem identified in # https://github.com/desihub/desimodel/issues/30 # if any([c.bzero is not None for c in tiledata.columns]): foo = [_tiles[k].dtype for k in tiledata.dtype.names] # - Check for out-of-date tiles file if np.issubdtype(tiledata["OBSCONDITIONS"].dtype, np.unsignedinteger): warnings.warn( "Old desi-tiles.fits with uint16 OBSCONDITIONS; please update your $DESIMODEL checkout.", DeprecationWarning, ) else: tiledata = Table.read(tilesfile) # - load cache for next time if cache: _tiles[tilesfile] = tiledata # - Filter to only the DESI footprint if requested subset = np.ones(len(tiledata), dtype=bool) if onlydesi: subset &= tiledata["IN_DESI"] > 0 # - Filter out PROGRAM=EXTRA tiles if requested if not extra: subset &= ~np.char.startswith(tiledata["PROGRAM"], "EXTRA") # ADM filter to program names if requested. if programs is not None: # ADM guard against a single string being passed. programs = np.atleast_1d(programs) isprog = np.zeros(len(tiledata), dtype=bool) for program in programs: isprog |= tiledata["PROGRAM"] == program subset &= isprog if np.all(subset): return tiledata else: return tiledata[subset]
_platescale = None
[docs]def load_platescale(): """Loads platescale.txt. Returns ------- :class:`~numpy.recarray` The data table read from the file. Notes ----- The returned object has these columns: radius Radius from center of focal plane [mm]. theta Radial angle that has a centroid at this radius [deg]. radial_platescale Meridional (radial) plate scale [um/arcsec]. az_platescale: Sagittal (azimuthal) plate scale [um/arcsec]. arclength: Unknown description. """ global _platescale if _platescale is not None: return _platescale infile = findfile("focalplane/platescale.txt") columns = [ ("radius", "f8"), ("theta", "f8"), ("radial_platescale", "f8"), ("az_platescale", "f8"), ("arclength", "f8"), ] try: _platescale = np.loadtxt(infile, usecols=[0, 1, 6, 7, 8], dtype=columns) except (IndexError,ValueError): # - no "arclength" column in this version of desimodel/data # - Get info from separate rzs file instead _platescale = np.loadtxt(infile, usecols=[0, 1, 6, 7, 7], dtype=columns) rzs = Table.read(findfile("focalplane/rzsn.txt"), format="ascii") from scipy.interpolate import interp1d from numpy.lib.recfunctions import append_fields arclength = interp1d(rzs["R"], rzs["S"], kind="quadratic") _platescale["arclength"] = arclength(_platescale["radius"]) return _platescale
_focalplane = None def ensure_focalplane_loaded(): global _focalplane if _focalplane is not None: return # First call, parse all data files. fpdir = os.path.join(datadir(), "focalplane") fppat = re.compile(r"^desi-focalplane_(.*)\.ecsv$") stpat = re.compile(r"^desi-state_(.*)\.ecsv$") expat = re.compile(r"^desi-exclusion_(.*)\.(?:yaml|json).*$") fpraw = dict() msg = "Loading focalplanes from {}".format(fpdir) log.debug(msg) for root, dirs, files in os.walk(fpdir): for f in files: fpmat = fppat.match(f) if fpmat is not None: dt = fpmat.group(1) if dt not in fpraw: fpraw[dt] = dict() fpraw[dt]["fp"] = os.path.join(root, f) continue stmat = stpat.match(f) if stmat is not None: dt = stmat.group(1) if dt not in fpraw: fpraw[dt] = dict() fpraw[dt]["st"] = os.path.join(root, f) continue exmat = expat.match(f) if exmat is not None: dt = exmat.group(1) if dt not in fpraw: fpraw[dt] = dict() fpraw[dt]["ex"] = os.path.join(root, f) break # Check that we have all 3 files needed for each timestamp for ts, files in fpraw.items(): for key in ["fp", "st", "ex"]: if key not in files: msg = "Focalplane state for time {} is missing one of \ the 3 required files (focalplane, state, exclusion)".format( ts ) raise RuntimeError(msg) # Now load the files for each time into our cached global variable. _focalplane = list() for ts in sorted(fpraw.keys()): dt = None file_dt = None try: file_dt = datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S%z") dt = file_dt except ValueError: # This is an old file with implicit UTC times (no offset) # Load it as-is and then set the time zone. file_dt = datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S") dt = file_dt.replace(tzinfo=timezone.utc) _focalplane.append(( dt, file_dt, { "fp_file": fpraw[ts]["fp"], "st_file": fpraw[ts]["st"], "ex_file": fpraw[ts]["ex"], "fp_data": None, "st_data": None, "ex_data": None, } ))
[docs]def get_focalplane_dates(): """Returns the dates of new focalplane definitions. There are two levels of time-dependent changes within the focalplane. First are the focalplane definitions, defined by a set of files ``$DESIMODEL/data/focalplane/{desi-exclusion,desi-focalplane,desi-state}_DATE.``. Those are the dates returned by this function, as a list of datetime objects. The second level is that, within the "state" table, there are changes to the states of individual positioners. (These are the dates returned when ``get_time_range=True`` is set in ``load_focalplane()``.) Returns ------- dates : :class:`list` of :class:`datetime` The dates when the focalplane changed. """ ensure_focalplane_loaded() global _focalplane return [dt for dt,fdt,v in _focalplane]
[docs]def load_focalplane(time=None, get_time_range=False): """Load the focalplane state that is valid for the given time. Parameters ---------- time : :class:`~datetime.datetime` The time to query with explicit timezone. Default to current time (:meth:`~datetime.datetime.now`) with timezone UTC. Returns ------- :class:`tuple` A tuple of (FP layout, exclusion polygons, state, time string). The FP layout is a Table. The exclusion polygons are a dictionary indexed by names that are referenced in the state. The state is a Table. The time string is the resulting UTC ISO format time string for the creation date of the FP model. If get_time_range=True, returns two additional values: time_low and time_high, both datetime objects giving the range of dates over which this description of the focal plane is valid. `time_high` may be None, indicating that there is no later known hardware state. In particular, these dates refer to the `state` of the positioners, which are more fine-grained than the `fp` and `exclusion` objects. """ # Time range over which this FP model is valid. time_lo = time_hi = None if time is None: time = datetime.now(tz=timezone.utc) elif time.tzinfo is None: msg = "Requested focalplane time '{}' is not timezone-aware. Assuming UTC.".format(time) log.warning(msg) time = time.replace(tzinfo=timezone.utc) # Convert requested time to UTC time = time.astimezone(tz=timezone.utc) # Load the global _focalplane cache if required ensure_focalplane_loaded() # Search the list of states for the most recent time that is before our # requested time. There should not be too many different states, or else # we are using the wrong format for storing these. Therefore, a linear # search should be fast enough. focalplane_props = None file_tmstr = None for dt, file_dt, props in _focalplane: if time >= dt: focalplane_props = props try: file_tmstr = file_dt.isoformat(timespec="seconds") except TypeError: # This must be python < 3.6, with no timespec option. # Since the focalplane time is read from the file name without # microseconds, the microseconds should be zero and so the # default return string will be correct. file_tmstr = file_dt.isoformat() time_lo = dt else: time_hi = dt break if focalplane_props is None: msg = "Cannot find focalplane for time {}".format(time) raise RuntimeError(msg) # Load the data if this is the first time working with this focalplane if focalplane_props["fp_data"] is None: focalplane_props["fp_data"] = Table.read( focalplane_props["fp_file"], format="ascii.ecsv" ) focalplane_props["st_data"] = Table.read( focalplane_props["st_file"], format="ascii.ecsv" ) if (focalplane_props['ex_file'].endswith('.json') or focalplane_props['ex_file'].endswith('.json.gz')): import json loadroutine = json.load else: loadroutine = yaml.safe_load try: # First try to load uncompressed with open(focalplane_props["ex_file"], "r") as f: focalplane_props["ex_data"] = loadroutine(f) except: # Must be gzipped with gzip.open(focalplane_props["ex_file"], "rb") as f: focalplane_props["ex_data"] = loadroutine(f) # Now "replay" the state up to our requested time. st_data = focalplane_props["st_data"] locstate = dict() # Believe it or not, parsing strings into dates takes a significant fraction of the compute # time here, so cache them! parsed_dates = {} for row in range(len(st_data)): datestr = st_data[row]["TIME"] tm = parsed_dates.get(datestr) if tm is None: # not cached, parse it! try: tm = datetime.strptime(datestr, "%Y-%m-%dT%H:%M:%S%z") except ValueError: # Old format with implicit UTC timezone tm = datetime.strptime(datestr, "%Y-%m-%dT%H:%M:%S") tm = tm.replace(tzinfo=timezone.utc) parsed_dates[datestr] = tm if tm <= time: loc = st_data[row]["LOCATION"] locstate[loc] = st_data[row] time_lo = tm else: time_hi = tm break rows = list() for loc in sorted(locstate.keys()): rows.append(locstate[loc]) state_data = Table(rows=rows, names=st_data.colnames) state_data.remove_column("TIME") rtn = ( focalplane_props["fp_data"], focalplane_props["ex_data"], state_data, file_tmstr ) if get_time_range: return rtn + (time_lo, time_hi) return rtn
[docs]def reset_cache(): """Reset I/O cache.""" global _thru, _psf, _params, _gfa, _fiberpos, _tiles, _platescale, _focalplane _thru = dict() _psf = dict() _params = None _gfa = None _fiberpos = None _tiles = dict() _platescale = None _focalplane = None
[docs]def load_target_info(): """Loads data/targets/targets.yaml and returns the nested dictionary. This is primarily syntactic sugar to avoid end users constructing paths and filenames by hand (which *e.g.* broke when targets.dat was renamed to targets.yaml). Returns ------- :class:`dict` The dictionary read from the YAML file. """ targetsfile = findfile("targets/targets.yaml") if not os.path.exists(targetsfile): targetsfile = findfile("targets/targets.dat") with open(targetsfile) as fx: data = yaml.safe_load(fx) return data
[docs]def load_pixweight(nside, pixmap=None): """Loads ``$DESIMODEL/data/footprint/desi-healpix-weights.fits``. Parameters ---------- nside : :class:`int` After loading, the array will be resampled to the passed HEALPix `nside`. pixmap : :class:`~astropy.io.fits.FITS_rec`, optional Input pixel weight map, already read from a weights file. Returns ------- Weight HEALPix weight map for the DESI footprint at the requested `nside`. """ import healpy as hp if pixmap is not None: log.debug("Using input pixel weight map of length {}.".format(len(pixmap))) else: # ADM read in the standard pixel weights file pixfile = findfile("footprint/desi-healpix-weights.fits") with fits.open(pixfile) as hdulist: pixmap = hdulist[0].data # ADM determine the file's nside, and flag a warning if the passed nside exceeds it npix = len(pixmap) truenside = hp.npix2nside(len(pixmap)) if truenside < nside: log.warning( "downsampling is fuzzy...Passed nside={}, but file {} is stored at nside={}".format( nside, pixfile, truenside ) ) # ADM resample the map return hp.pixelfunc.ud_grade(pixmap, nside, order_in="NESTED", order_out="NESTED")
[docs]def findfile(filename, surveyops=False): """Return full path to data file ``$DESIMODEL/data/filename``. Parameters ---------- filename : :class:`str` Name of the file, relative to the desimodel data directory. surveyops : :class:`bool` If ``True`` then find the relevant path for the $DESI_SURVEYOPS directory rather than the $DESIMODEL directory. Returns ------- :class:`str` The full path. Notes ----- This is a precursor for a potential future refactor where desimodel data would be installed with the package and :envvar:`DESIMODEL` would become an optional override. """ return os.path.join(datadir(surveyops), filename)
[docs]def datadir(surveyops=False): """Returns location to desimodel data. Parameters ---------- surveyops : :class:`bool` If ``True`` then find the relevant path for the $DESI_SURVEYOPS directory rather than the $DESIMODEL directory. Notes ----- If `surveyops`==``False`` and :envvar:`DESIMODEL` is set, then $DESIMODEL overrides data installed with the package. """ if surveyops: if "DESI_SURVEYOPS" in os.environ: surveyops = os.environ["DESI_SURVEYOPS"] # ADM test whether surveyops directory was checked out to trunk. if os.path.isdir(os.path.join(surveyops, "trunk", "ops")): surveyops = os.path.join(surveyops, "trunk") return os.path.abspath(os.path.join(surveyops, "ops")) # ADM raise a custom exception if $DESI_SURVEYOPS is not set. else: raise MissingEnvVar(f"$DESI_SURVEYOPS is not set") else: if "DESIMODEL" in os.environ: return os.path.abspath(os.path.join(os.environ["DESIMODEL"], "data")) else: import pkg_resources return pkg_resources.resource_filename("desimodel", "data")