# See LICENSE.rst for BSD 3-clause license info
# -*- coding: utf-8 -*-
"""
desimodel.weather
=================
Model of the expected weather conditions at KPNO during the DESI survey.
To generate a random time series of expected FWHM seeing in arcsecs and
atmospheric transparency, use, for example::
n = 10000
dt = 300 # seconds
t = np.arange(n) * dt
gen = np.random.RandomState(seed=123)
seeing = sample_seeing(n, dt_sec=dt, gen=gen)
transp = sample_transp(n, dt_sec=dt, gen=gen)
The resulting arrays are randomly sampled from models of the 1D probability
density and 2-point power spectral density derived from MzLS observations.
See `DESI-doc-3087
<https://desi.lbl.gov/DocDB/cgi-bin/private/ShowDocument?docid=3087>`__
for details.
Used by :mod:`surveysim.weather` for simulations of DESI observing and
survey strategy studies.
"""
from __future__ import print_function, division
import os
import datetime
import calendar
import numpy as np
import scipy.interpolate
import scipy.special
import astropy.table
[docs]def _seeing_fit_model(x):
"""Evalute the fit to MzLS seeing described in DESI-doc-3087.
"""
p = np.array([ 0.07511146, 0.44276671, 23.02442192, 38.07691498])
y = (1 + ((x - p[0]) / p[1]) ** 2) ** (-p[2]) * x ** p[3]
return y / (y.sum() * np.gradient(x))
[docs]def get_seeing_pdf(median_seeing=1.1, max_seeing=2.5, n=250):
"""Return PDF of FWHM seeing for specified clipped median value.
Note that this is atmospheric seeing, not delivered image quality.
The reference wavelength for seeing values is 6355A, in the r band,
and the observed wavelength dependence in Dey & Valdes is closer to
``lambda ** (-1/15)`` than the ``lambda ** (-1/5)`` predicted by
Kolmogorov theory. See DESI-doc-3087 for details.
Scales the clipped MzLS seeing PDF in order to achieve the requested
median value. Note that clipping is applied before scaling, so
the output PDF is clipped at scale * max_seeing.
Parameters
----------
median_seeing : float
Target FWHM seeing value in arcsec. Must be in the range [0.95, 1.30].
max_seeing : float
Calculate scaled median using unscaled values below this value.
n : int
Size of grid to use for tabulating the returned arrays.
Returns
-------
tuple
Tuple (fwhm, pdf) that tabulates pdf[fwhm]. Normalized so that
``np.sum(pdf * np.gradient(fwhm)) = 1``.
"""
# Tabulate the nominal (scale=1) seeing PDF.
fwhm = np.linspace(0., max_seeing, n)
pdf = _seeing_fit_model(fwhm)
pdf /= (pdf.sum() * np.gradient(fwhm))
cdf = np.cumsum(pdf)
cdf /= cdf[-1]
# Tabulate the median as a function of FWHM scale.
scale = np.linspace(0.9, 1.4, 11)
median = np.empty_like(scale)
for i, s in enumerate(scale):
median[i] = np.interp(0.5, cdf, s * fwhm)
if median_seeing < median[0] or median_seeing > median[-1]:
raise ValueError('Requested median is outside allowed range.')
# Interpolate to find the scale factor that gives the requested median.
s = np.interp(median_seeing, median, scale)
return fwhm * s, pdf / s
[docs]def sample_timeseries(x_grid, pdf_grid, psd, n_sample, dt_sec=180., gen=None):
"""Sample a time series specified by a power spectrum and 1D PDF.
The PSD should describe the temporal correlations of whitened samples.
Generated samples will then be unwhitened to recover the input 1D PDF.
See DESI-doc-3087 for details.
Uses :func:`whiten_transforms_from_cdf`.
Parameters
----------
x_grid : array
1D array of N increasing grid values covering the parameter range
to sample from.
pdf_grid : array
1D array of N increasing PDF values corresponding to each x_grid.
Does not need to be normalized.
psd : callable
Function of frequency in 1/days that returns the power-spectral
density of whitened temporal fluctations to sample from. Will only be
called for positive frequencies. Normalization does not matter.
n_sample : int
Number of equally spaced samples to generate.
dt_sec : float
Time interval between samples in seconds.
gen : np.random.RandomState or None
Provide an existing RandomState for full control of reproducible random
numbers, or None for non-reproducible random numbers.
"""
x_grid = np.array(x_grid)
pdf_grid = np.array(pdf_grid)
if not np.all(np.diff(x_grid) > 0):
raise ValueError('x_grid values are not increasing.')
if x_grid.shape != pdf_grid.shape:
raise ValueError('x_grid and pdf_grid arrays have different shapes.')
# Initialize random numbers if necessary.
if gen is None:
gen = np.random.RandomState()
# Calculate the CDF.
cdf_grid = np.cumsum(pdf_grid)
cdf_grid /= cdf_grid[-1]
# Calculate whitening / unwhitening transforms.
whiten, unwhiten = whiten_transforms_from_cdf(x_grid, cdf_grid)
# Build a linear grid of frequencies present in the Fourier transform
# of the requested time series. Frequency units are 1/day.
dt_day = dt_sec / (24. * 3600.)
df_day = 1. / (n_sample * dt_day)
f_grid = np.arange(1 + (n_sample // 2)) * df_day
# Tabulate the power spectral density at each frequency. The PSD
# describes seeing fluctuations that have been "whitened", i.e., mapped
# via a non-linear monotonic transform to have unit Gaussian probability
# density.
psd_grid = np.empty_like(f_grid)
psd_grid[1:] = psd(f_grid[1:])
# Force the mean to zero.
psd_grid[0] = 0.
# Force the variance to one.
psd_grid[1:] /= psd_grid[1:].sum() * df_day ** 2
# Generate random whitened samples.
n_psd = len(psd_grid)
x_fft = np.ones(n_psd, dtype=complex)
x_fft[1:-1].real = gen.normal(size=n_psd - 2)
x_fft[1:-1].imag = gen.normal(size=n_psd - 2)
x_fft *= np.sqrt(psd_grid) / (2 * dt_day)
x_fft[0] *= np.sqrt(2)
x = np.fft.irfft(x_fft, n_sample)
# Un-whiten the samples to recover the desired 1D PDF.
x_cdf = 0.5 * (1 + scipy.special.erf(x / np.sqrt(2)))
return np.interp(x_cdf, cdf_grid, x_grid)
[docs]def _seeing_psd(freq):
"""Evaluate the 'chi-by-eye' fit of the seeing PSD described in
DESI-doc-3087.
"""
N, f0, a0, a1 = 8000, 0.10, 2.8, -1.1
return (N * (freq/f0)**a0 / (1 + (freq/f0)**a0) *
(freq/f0) ** a1 / (10 + (freq/f0) ** a1))
[docs]def sample_seeing(n_sample, dt_sec=180., median_seeing=1.1, max_seeing=2.5,
gen=None):
"""Generate a random time series of FWHM seeing values.
See DESI-doc-3087 for details. Uses :func:`get_seeing_pdf`,
:func:`_seeing_psd` and :func:`sample_timeseries`.
Parameters
----------
n_sample : int
Number of equally spaced samples to generate.
dt_sec : float
Time interval between samples in seconds.
median_seeing : float
See :func:`get_seeing_pdf`.
mex_seeing : float
See :func:`get_seeing_pdf`.
gen : np.random.RandomState or None
Provide an existing RandomState for full control of reproducible random
numbers, or None for non-reproducible random numbers.
Returns
-------
array
1D array of randomly generated samples.
"""
fwhm_grid, pdf_grid = get_seeing_pdf(median_seeing, max_seeing)
return sample_timeseries(
fwhm_grid, pdf_grid, _seeing_psd, n_sample, dt_sec, gen)
_transp_pdf_cum = np.array([0.06,0.11,1.0])
_transp_pdf_powers = np.array([0., 2.5, 35.])
[docs]def get_transp_pdf(n=250):
"""Return PDF of atmospheric transparency.
Derived from MzLS observations, but corrected for dust accumulation and
measurement error. See DESI-doc-3087 for details.
Parameters
----------
n : int
Size of grid to use for tabulating the returned arrays.
Returns
-------
tuple
Tuple (transp, pdf) that tabulates pdf[transp]. Normalized so that
``np.sum(pdf * np.gradient(transp)) = 1``.
"""
transp = np.linspace(0., 1., n)
pdf = np.zeros_like(transp)
last_c = 0.
for c, p in zip(_transp_pdf_cum, _transp_pdf_powers):
pdf += (c - last_c) * np.power(transp, p) * (p + 1)
last_c = c
pdf /= pdf.sum() * np.gradient(transp)
return transp, pdf
[docs]def _transp_psd(freq):
"""Evaluate the 'chi-by-eye' fit of the transparency PSD described in
DESI-doc-3087.
"""
N, f0, a0, a1 = 500, 1.5, 0.0, -1.5
return (N * (freq/f0)**a0 / (1 + (freq/f0)**a0) *
(freq/f0) ** a1 / (1 + (freq/f0) ** a1))
[docs]def sample_transp(n_sample, dt_sec=180., gen=None):
"""Generate a random time series of atmospheric transparency values.
See DESI-doc-3087 for details. Uses :func:`get_transp_pdf`,
:func:`_transp_psd` and :func:`sample_timeseries`.
Parameters
----------
n_sample : int
Number of equally spaced samples to generate.
dt_sec : float
Time interval between samples in seconds.
gen : np.random.RandomState or None
Provide an existing RandomState for full control of reproducible random
numbers, or None for non-reproducible random numbers.
Returns
-------
array
1D array of randomly generated samples.
"""
transp_grid, pdf_grid = get_transp_pdf()
return sample_timeseries(
transp_grid, pdf_grid, _transp_psd, n_sample, dt_sec, gen)
[docs]def dome_closed_fractions(start_date, stop_date,
replay='Y2007,Y2008,Y2009,Y2010,Y2011,Y2012,Y2013,Y2014'):
"""Return dome-closed fractions for each night of the survey.
Years can be replayed in any order. If the number of years to replay is less
than the survey duration, they are repeated.
Parameters
----------
start_date : datetime.date or None
Survey starts on the evening of this date. Use the ``first_day``
config parameter if None (the default).
stop_date : datetime.date or None
Survey stops on the morning of this date. Use the ``last_day``
config parameter if None (the default).
replay : str
Comma-separated list of years to replay, identified by arbitrary strings
that must match column names in the DESIMODEL weather history.
Returns
-------
numpy array
1D array of N probabilities between 0-1, where N is the number of nights
spanned by the start and stop dates.
"""
# Check the inputs.
num_nights = (stop_date - start_date).days
if num_nights <= 0:
raise ValueError('Expected start_date < stop_date.')
replay = replay.split(',')
# Load tabulated daily weather history.
DESIMODEL = os.getenv('DESIMODEL')
path = os.path.join(DESIMODEL, 'data', 'weather', 'daily-2007-2017.csv')
t = astropy.table.Table.read(path)
if not len(t) == 365:
raise ValueError('Invalid weather history length (expected 365).')
years = t.colnames
lostfracs = []
for yr in replay:
if yr not in years:
raise ValueError('Replay year "{}" not in weather history.'.format(yr))
lostfrac = t[yr].data
if not np.all((lostfrac >= 0) & (lostfrac <= 1)):
raise ValueError('Invalid weather history for replay year "{}".'.format(yr))
lostfracs.append(lostfrac)
# Replay the specified years (with wrap-around if necessary),
# overlaid on the actual survey dates.
probs = np.zeros(num_nights)
start = start_date
for year_num, year in enumerate(range(start_date.year, stop_date.year + 1)):
first = datetime.date(year=year, month=1, day=1)
stop = datetime.date(year=year + 1, month=1, day=1)
if stop > stop_date:
stop = stop_date
n = (stop - start).days
if n == 0:
break
if calendar.isleap(year):
n -= 1
idx = (start - start_date).days
jdx = (start - first).days
lostfrac = lostfracs[year_num % len(replay)]
probs[idx:idx + n] = lostfrac[jdx:jdx + n]
start = stop
return probs