# PyTransit: fast and easy exoplanet transit modelling in Python.
# Copyright (C) 2010-2019 Hannu Parviainen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from pathlib import Path
from typing import List, Union, Iterable
from astropy.stats import sigma_clip
from matplotlib.pyplot import subplots, setp
from numba import njit, prange
from numpy import (inf, sqrt, ones, zeros_like, concatenate, diff, log, ones_like, all,
clip, argsort, any, s_, zeros, arccos, nan, full, pi, sum, repeat, asarray, ndarray, log10,
array, atleast_2d, isscalar, atleast_1d, where, isfinite, arange, unique, squeeze, ceil, percentile,
floor, diag, nanstd)
from numpy.random import uniform, normal, permutation, multivariate_normal
from scipy.stats import norm
from .loglikelihood import WNLogLikelihood, CeleriteLogLikelihood
try:
from ldtk import LDPSetCreator
with_ldtk = True
except ImportError:
with_ldtk = False
from ..models.transitmodel import TransitModel
from ..orbits.orbits_py import duration_eccentric, as_from_rhop, i_from_ba, i_from_baew, d_from_pkaiews, epoch
from ..param.parameter import ParameterSet, PParameter, GParameter, LParameter
from ..param.parameter import UniformPrior as U, NormalPrior as N, GammaPrior as GM
from .. import QuadraticModel
from .logposteriorfunction import LogPosteriorFunction
@njit(cache=False)
def lnlike_normal(o, m, e):
return -sum(log(e)) -0.5*o.size*log(2.*pi) - 0.5*sum((o-m)**2/e**2)
@njit(cache=False)
def lnlike_normal_s(o, m, e):
return -o.size*log(e) -0.5*o.size*log(2.*pi) - 0.5*sum((o-m)**2)/e**2
@njit(parallel=True, cache=False, fastmath=True)
def lnlike_normal_v(o, m, e, wnids, lcids):
m = atleast_2d(m)
npv = m.shape[0]
npt = o.size
lnl = zeros(npv)
for i in prange(npv):
for j in range(npt):
k = wnids[lcids[j]]
lnl[i] += -log(e[i,k]) - 0.5*log(2*pi) - 0.5*((o[j]-m[i,j])/e[i,k])**2
return lnl
@njit(fastmath=True)
def map_pv(pv):
pv = atleast_2d(pv)
pvt = zeros((pv.shape[0], 7))
pvt[:,0] = sqrt(pv[:,4])
pvt[:,1:3] = pv[:,0:2]
pvt[:, 3] = as_from_rhop(pv[:,2], pv[:,1])
pvt[:, 4] = i_from_ba(pv[:,3], pvt[:,3])
return pvt
@njit(fastmath=True, cache=False)
def map_ldc(ldc):
ldc = atleast_2d(ldc)
uv = zeros_like(ldc)
a, b = sqrt(ldc[:,0::2]), 2.*ldc[:,1::2]
uv[:,0::2] = a * b
uv[:,1::2] = a * (1. - b)
return uv
[docs]class BaseLPF(LogPosteriorFunction):
_lpf_name = 'BaseLPF'
def __init__(self, name: str, passbands: list, times: list = None, fluxes: Iterable = None, errors: list = None,
pbids: list = None, covariates: list = None, wnids: list = None, tm: TransitModel = None,
nsamples: tuple = 1, exptimes: tuple = 0., init_data=True, result_dir: Path = None, tref: float = 0.0,
lnlikelihood: str = 'wn'):
"""The base Log Posterior Function class.
The `BaseLPF` class creates the basis for transit light curve analyses using `PyTransit`. This class can be
used in a basic analysis directly, or it can be inherited to create a basis for a more complex analysis.
Parameters
----------
name: str
Name of the log posterior function instance.
passbands: Iterable
List of unique passband names (filters) that the light curves have been observed in.
times: Iterable
List of 1d ndarrays each containing the mid-observation times for a single light curve.
fluxes: Iterable
List of 1d ndarrays each containing the normalized fluxes for a single light curve.
errors: Iterable
List of 1d ndarrays each containing the flux measurement uncertainties for a single light curvel.
pbids: Iterable of ints
List of passband indices mapping each light curve to a single passband.
covariates: Iterable
List of covariates one 2d narray per light curve.
wnids: Iterable of ints
List of noise set indices mapping each light curve to a single noise set.
tm: TransitModel
Transitmodel to use instead of the default model.
nsamples: list[int]
List of supersampling factors. The values should be integers and given one per light curve.
exptimes: list[float]
List of exposure times. The values should be floats with the time given in days.
init_data: bool
Set to `False` to allow the LPF to be initialized without data. This is mainly for debugging.
result_dir: Path, optional
Default saving directory
tref: float, optional
Reference time
"""
self._pre_initialisation()
super().__init__(name=name, result_dir=result_dir)
self.tm = tm or QuadraticModel(klims=(0.01, 0.75), nk=512, nz=512)
self._tref = tref
# Passbands
# ---------
# Passbands should be arranged from blue to red
if isinstance(passbands, (list, tuple, ndarray)):
self.passbands = passbands
else:
self.passbands = [passbands]
self.npb = npb = len(self.passbands)
self.nsamples = None
self.exptimes = None
self.lnlikelihood_type = lnlikelihood.lower()
# Declare high-level objects
# --------------------------
self._lnlikelihood_models = []
self._baseline_models = []
self.ps = None # Parametrisation
self.de = None # Differential evolution optimiser
self.sampler = None # MCMC sampler
self.instrument = None # Instrument
self.ldsc = None # Limb darkening set creator
self.ldps = None # Limb darkening profile set
self.cntm = None # Contamination model
# Declare data arrays and variables
# ---------------------------------
self.nlc: int = 0 # Number of light curves
self.n_noise_blocks: int = 0 # Number of noise blocks
self.noise_ids = None
self.times: list = None # List of time arrays
self.fluxes: list = None # List of flux arrays
self.errors: list = None # List of flux uncertainties
self.covariates: list = None # List of covariates
self.wn: ndarray = None # Array of white noise estimates for each light curve
self.timea: ndarray = None # Array of concatenated times
self.mfluxa: ndarray = None # Array of concatenated model fluxes
self.ofluxa: ndarray = None # Array of concatenated observed fluxes
self.errora: ndarray = None # Array of concatenated model fluxes
self.lcids: ndarray = None # Array of light curve indices for each datapoint
self.pbids: ndarray = None # Array of passband indices for each light curve
self.lcslices: list = None # List of light curve slices
if init_data:
# Set up the observation data
# ---------------------------
self._init_data(times = times, fluxes = fluxes, pbids = pbids, covariates = covariates,
errors = errors, wnids = wnids, nsamples = nsamples, exptimes = exptimes)
# Set up the parametrisation
# --------------------------
self._init_parameters()
# Inititalise the instrument
# --------------------------
self._init_instrument()
self._init_lnlikelihood()
self._init_baseline()
self._post_initialisation()
def _init_data(self, times: Union[List, ndarray], fluxes: Union[List, ndarray], pbids: Union[List, ndarray] = None,
covariates: Union[List, ndarray] = None, errors: Union[List, ndarray] = None, wnids: Union[List, ndarray] = None,
nsamples: Union[int, ndarray, Iterable] = 1, exptimes: Union[float, ndarray, Iterable] = 0.):
if isinstance(times, ndarray) and times.ndim == 1 and times.dtype == float:
times = [times]
if isinstance(fluxes, ndarray) and fluxes.ndim == 1 and fluxes.dtype == float:
fluxes = [fluxes]
if pbids is None:
if self.pbids is None:
self.pbids = zeros(len(fluxes), int)
else:
self.pbids = atleast_1d(pbids).astype('int')
self.nlc = len(times)
self.times = times
self.fluxes = fluxes
self.wn = [nanstd(diff(f)) / sqrt(2) for f in fluxes]
self.timea = concatenate(self.times)
self.ofluxa = concatenate(self.fluxes)
self.mfluxa = zeros_like(self.ofluxa)
self.lcids = concatenate([full(t.size, i) for i, t in enumerate(self.times)])
# TODO: Noise IDs get scrambled when removing transits, fix!!!
if wnids is None:
if self.noise_ids is None:
self.noise_ids = zeros(self.nlc, int)
self.n_noise_blocks = 1
else:
self.noise_ids = asarray(wnids)
self.n_noise_blocks = len(unique(self.noise_ids))
assert self.noise_ids.size == self.nlc, "Need one noise block id per light curve."
assert self.noise_ids.max() == self.n_noise_blocks - 1, "Error initialising noise block ids."
if isscalar(nsamples):
self.nsamples = full(self.nlc, nsamples)
self.exptimes = full(self.nlc, exptimes)
else:
assert (len(nsamples) == self.nlc) and (len(exptimes) == self.nlc)
self.nsamples = asarray(nsamples, 'int')
self.exptimes = asarray(exptimes)
self.tm.set_data(self.timea-self._tref, self.lcids, self.pbids, self.nsamples, self.exptimes)
if errors is None:
self.errors = [full(t.size, nan) for t in self.times]
else:
self.errors = errors
self.errora = concatenate(self.errors)
# Initialise the light curves slices
# ----------------------------------
self.lcslices = []
sstart = 0
for i in range(self.nlc):
s = self.times[i].size
self.lcslices.append(s_[sstart:sstart + s])
sstart += s
# Initialise the covariate arrays, if given
# -----------------------------------------
if covariates is not None:
self.covariates = covariates
for cv in self.covariates:
cv = (cv - cv.mean(0)) / cv.std(0)
#self.ncovs = self.covariates[0].shape[1]
#self.covsize = array([c.size for c in self.covariates])
#self.covstart = concatenate([[0], self.covsize.cumsum()[:-1]])
#self.cova = concatenate(self.covariates)
def _add_lnlikelihood_model(self, lnl):
self._lnlikelihood_models.append(lnl)
def _add_baseline_model(self, blm):
self._baseline_models.append(blm)
def _init_parameters(self):
self.ps = ParameterSet()
self._init_p_orbit()
self._init_p_planet()
self._init_p_limb_darkening()
self._init_p_baseline()
self.ps.freeze()
def _init_p_orbit(self):
"""Orbit parameter initialisation.
"""
porbit = [
GParameter('tc', 'zero_epoch', 'd', N(0.0, 0.1), (-inf, inf)),
GParameter('p', 'period', 'd', N(1.0, 1e-5), (0, inf)),
GParameter('rho', 'stellar_density', 'g/cm^3', U(0.1, 25.0), (0, inf)),
GParameter('b', 'impact_parameter', 'R_s', U(0.0, 1.0), (0, 1))]
self.ps.add_global_block('orbit', porbit)
def _init_p_planet(self):
"""Planet parameter initialisation.
"""
pk2 = [PParameter('k2', 'area_ratio', 'A_s', U(0.05**2, 0.2**2), (0, inf))]
self.ps.add_passband_block('k2', 1, 1, pk2)
self._pid_k2 = repeat(self.ps.blocks[-1].start, self.npb)
self._start_k2 = self.ps.blocks[-1].start
self._sl_k2 = self.ps.blocks[-1].slice
def _init_p_limb_darkening(self):
"""Limb darkening parameter initialisation.
"""
pld = concatenate([
[PParameter(f'q1_{pb}', 'q1 coefficient {pb}', '', U(0, 1), bounds=(0, 1)),
PParameter(f'q2_{pb}', 'q2 coefficient {pb}', '', U(0, 1), bounds=(0, 1))]
for i,pb in enumerate(self.passbands)])
self.ps.add_passband_block('ldc', 2, self.npb, pld)
self._sl_ld = self.ps.blocks[-1].slice
self._start_ld = self.ps.blocks[-1].start
def _init_p_baseline(self):
pass
def _init_p_noise(self):
pass
def _init_instrument(self):
pass
def _pre_initialisation(self):
pass
def _post_initialisation(self):
pass
def _init_lnlikelihood(self):
if self.lnlikelihood_type == 'wn':
self._add_lnlikelihood_model(WNLogLikelihood(self))
elif self.lnlikelihood_type == 'celerite':
self._add_lnlikelihood_model(CeleriteLogLikelihood(self))
else:
raise NotImplementedError
def _init_baseline(self):
pass
[docs] def create_pv_population(self, npop=50):
pvp = self.ps.sample_from_prior(npop)
# With LDTk
# ---------
#
# Use LDTk to create the sample if LDTk has been initialised.
#
if self.ldps:
istart = self._start_ld
cms, ces = self.ldps.coeffs_tq()
for i, (cm, ce) in enumerate(zip(cms.flat, ces.flat)):
pvp[:, i + istart] = normal(cm, ce, size=pvp.shape[0])
# No LDTk
# -------
#
# Ensure that the total limb darkening decreases towards
# red passbands.
#
else:
ldsl = self._sl_ld
for i in range(pvp.shape[0]):
pid = argsort(pvp[i, ldsl][::2])[::-1]
pvp[i, ldsl][::2] = pvp[i, ldsl][::2][pid]
pvp[i, ldsl][1::2] = pvp[i, ldsl][1::2][pid]
return pvp
[docs] def add_prior(self, prior):
self._additional_log_priors.append(prior)
[docs] def baseline(self, pv):
if self._baseline_models:
if pv.ndim == 1:
bl = ones_like(self.timea)
else:
bl = ones((pv.shape[0], self.timea.size))
for blm in self._baseline_models:
bl = blm(pv, bl)
return bl
else:
return 1.
[docs] def trends(self, pv):
"""Additive trends"""
return 0.
[docs] def transit_model(self, pv, copy=True):
pv = atleast_2d(pv)
ldc = map_ldc(pv[:,self._sl_ld])
zero_epoch = pv[:,0] - self._tref
period = pv[:,1]
smaxis = as_from_rhop(pv[:,2], period)
inclination = i_from_ba(pv[:,3], smaxis)
radius_ratio = sqrt(pv[:,4:5])
return self.tm.evaluate(radius_ratio, ldc, zero_epoch, period, smaxis, inclination)
[docs] def flux_model(self, pv):
baseline = self.baseline(pv)
trends = self.trends(pv)
model_flux = self.transit_model(pv)
return baseline * model_flux + trends
[docs] def residuals(self, pv):
return self.ofluxa - self.flux_model(pv)
[docs] def lnlikelihood(self, pvp):
"""Log likelihood for a 1D or 2D array of model parameters.
Parameters
----------
pvp: ndarray
Either a 1D parameter vector or a 2D parameter array.
Returns
-------
Log likelihood for the given parameter vector(s).
"""
fmodel = self.flux_model(pvp)
if pvp.ndim == 1:
lnl = 0.
else:
lnl = zeros(pvp.shape[0])
for lnlikelihood in self._lnlikelihood_models:
lnl += lnlikelihood(pvp, fmodel)
return lnl
[docs] def set_radius_ratio_prior(self, kmin, kmax):
"""Set a uniform prior on all radius ratios."""
for p in self.ps[self._sl_k2]:
p.prior = U(kmin ** 2, kmax ** 2)
[docs] def add_t14_prior(self, mean: float, std: float) -> None:
"""Add a normal prior on the transit duration.
Parameters
----------
mean: float
Mean of the normal distribution
std: float
Standard deviation of the normal distribution.
"""
def T14(pv):
pv = atleast_2d(pv)
a = as_from_rhop(pv[:, 2], pv[:, 1])
t14 = duration_eccentric(pv[:, 1], sqrt(pv[:, 4]), a, arccos(pv[:, 3] / a), 0, 0, 1)
return norm.logpdf(t14, mean, std)
self._additional_log_priors.append(T14)
[docs] def add_as_prior(self, mean: float, std: float) -> None:
"""Add a normal prior on the scaled semi-major axis :math:`(a / R_\star)`.
Parameters
----------
mean: float
Mean of the normal distribution.
std: float
Standard deviation of the normal distribution
"""
def as_prior(pv):
a = as_from_rhop(pv[2], pv[1])
return norm.logpdf(a, mean, std)
self._additional_log_priors.append(as_prior)
[docs] def add_ldtk_prior(self, teff: tuple, logg: tuple, z: tuple, passbands: tuple,
uncertainty_multiplier: float = 3, **kwargs) -> None:
"""Add a LDTk-based prior on the limb darkening.
Parameters
----------
teff
logg
z
passbands
uncertainty_multiplier
Returns
-------
"""
if 'pbs' in kwargs.keys():
raise DeprecationWarning("The 'pbs' argument has been renamed to 'passbands'")
if isinstance(passbands[0], str):
raise DeprecationWarning(
'Passing passbands by name has been deprecated, they should be now Filter instances.')
self.ldsc = LDPSetCreator(teff, logg, z, list(passbands))
self.ldps = self.ldsc.create_profiles(1000)
self.ldps.resample_linear_z()
self.ldps.set_uncertainty_multiplier(uncertainty_multiplier)
def ldprior(pv):
return self.ldps.lnlike_tq(pv[:, self._sl_ld].reshape([pv.shape[0], -1, 2]))
self._additional_log_priors.append(ldprior)
[docs] def remove_outliers(self, sigma=5):
fmodel = squeeze(self.flux_model(self.de.minimum_location))
covariates = [] if self.covariates is not None else None
times, fluxes, lcids, errors = [], [], [], []
for i in range(len(self.times)):
res = self.fluxes[i] - fmodel[self.lcslices[i]]
mask = ~sigma_clip(res, sigma=sigma).mask
times.append(self.times[i][mask])
fluxes.append(self.fluxes[i][mask])
if covariates is not None:
covariates.append(self.covariates[i][mask])
if self.errors is not None:
errors.append(self.errors[i][mask])
self._init_data(times=times, fluxes=fluxes, covariates=self.covariates, pbids=self.pbids,
errors=(errors if self.errors is not None else None), wnids=self.noise_ids,
nsamples=self.nsamples, exptimes=self.exptimes)
[docs] def remove_transits(self, tids):
m = ones(len(self.times), bool)
m[tids] = False
self._init_data(self.times[m], self.fluxes[m], self.pbids[m],
self.covariates[m] if self.covariates is not None else None,
self.errors[m], self.noise_ids[m], self.nsamples[m], self.exptimes[m])
self._init_parameters()
[docs] def posterior_samples(self, burn: int = 0, thin: int = 1, derived_parameters: bool = True):
df = super().posterior_samples(burn=burn, thin=thin)
if derived_parameters:
for k2c in df.columns[self._sl_k2]:
df[k2c.replace('k2', 'k')] = sqrt(df[k2c])
df['a'] = as_from_rhop(df.rho.values, df.p.values)
df['inc'] = i_from_baew(df.b.values, df.a.values, 0., 0.)
average_ks = sqrt(df.iloc[:, self._sl_k2]).mean(1).values
df['t14'] = d_from_pkaiews(df.p.values, average_ks, df.a.values, df.inc.values, 0., 0., 1, kind=14)
df['t23'] = d_from_pkaiews(df.p.values, average_ks, df.a.values, df.inc.values, 0., 0., 1, kind=23)
return df
[docs] def plot_light_curves(self, method='de', ncol: int = 3, width: float = 2., max_samples: int = 1000, figsize=None,
data_alpha=0.5, ylim=None):
nrow = int(ceil(self.nlc / ncol))
if method == 'mcmc':
df = self.posterior_samples(derived_parameters=False)
t0, p = df.tc.median(), df.p.median()
fmodel = self.flux_model(permutation(df.values)[:max_samples])
fmperc = percentile(fmodel, [50, 16, 84, 2.5, 97.5], 0)
else:
fmodel = squeeze(self.flux_model(self.de.minimum_location))
t0, p = self.de.minimum_location[0], self.de.minimum_location[1]
fmperc = None
fig, axs = subplots(nrow, ncol, figsize=figsize, constrained_layout=True, sharey='all', sharex='all',
squeeze=False)
for i in range(self.nlc):
ax = axs.flat[i]
e = epoch(self.times[i].mean(), t0, p)
tc = t0 + e * p
time = self.times[i] - tc
ax.plot(time, self.fluxes[i], '.', alpha=data_alpha)
if method == 'de':
ax.plot(time, fmodel[self.lcslices[i]], 'w', lw=4)
ax.plot(time, fmodel[self.lcslices[i]], 'k', lw=1)
else:
ax.fill_between(time, *fmperc[3:5, self.lcslices[i]], alpha=0.15)
ax.fill_between(time, *fmperc[1:3, self.lcslices[i]], alpha=0.25)
ax.plot(time, fmperc[0, self.lcslices[i]])
setp(ax, xlabel=f'Time - T$_c$ [d]', xlim=(-width / 2 / 24, width / 2 / 24))
setp(axs[:, 0], ylabel='Normalised flux')
if ylim is not None:
setp(axs, ylim=ylim)
for ax in axs.flat[self.nlc:]:
ax.remove()
return fig
def __repr__(self):
return f"Target: {self.name}\nLPF: {self._lpf_name}\n Passbands: {self.passbands}"