# Licensed under a 3-clause BSD style license - see LICENSE
'''
Provides the framework for calculating theoretical perturbations due to faint, unresolved
objects blended with brighter sources, under the assumption that sources are fit with PSF
photometry while in a sky background-dominated regime such that noise is constant across
the detector.
'''
import itertools
import os
import shutil
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import cm, gridspec
from matplotlib.colors import Normalize
from scipy.optimize import basinhopping, minimize
from scipy.special import erf # pylint: disable=no-name-in-module
from macauff.misc_functions import make_pool
# Assume that usetex = False only applies for tests where no TeX is installed
# at all, instead of users having half-installed TeX, dvipng et al. somewhere.
usetex = not not shutil.which("tex") # pylint: disable=unneeded-not
if usetex:
plt.rcParams.update({"text.usetex": True, "text.latex.preamble": r"\usepackage{amsmath}"})
__all__ = ['FitPSFPerturbations']
[docs]
class FitPSFPerturbations:
"""
Class that derives a parameterisation of the effect of a hidden, blended
contaminant within a brighter source in a photometric image, based on
fitting the composite object with a single Gaussian PSF in the limit that
sky background dominates and noise is constant across the image.
"""
def __init__(self, psf_fwhm, d_di, d_li, n_pool, data_save_folder, plot_save_folder=None):
"""
Initialise FitPSFPerturbations with necessary parameters.
Parameters
----------
psf_fwhm : float
The full-width at half-maximum of the PSF to simulate fitting
for perturbations.
d_di : float
Separation between perturber offsets, setting the precision of the
simulated perturbations.
d_li : float
Separation between relative perturber fluxes, affecting the
precision of the derived offset relations.
n_pool : integer
Number of parallel threads to use for ``multiprocessing`` calls.
data_save_folder : string
Relative or absolute path into which to save the data created by
the fitting process.
plot_save_folder : string
Relative or absolute path, used to save plots generated as part
of the PSF algorithm AUF perturbation parameters.
"""
self.psf_fwhm = psf_fwhm
self.psf_sig = self.psf_fwhm / (2 * np.sqrt(2 * np.log(2)))
# Set the maximum offset to be the Rayleigh criterion, assuming
# we would resolve the blended object outside of this all of the time.
self.di = np.arange(0, 1.185*self.psf_fwhm, d_di)
# Handle l=1 separately
self.li = np.arange(0.15, 1-d_li+1e-10, d_li)
self.n_pool = n_pool
self.data_save_folder = data_save_folder
self.plot_save_folder = plot_save_folder
if not os.path.exists(self.data_save_folder):
os.makedirs(self.data_save_folder)
if self.plot_save_folder is not None and not os.path.exists(self.plot_save_folder):
os.makedirs(self.plot_save_folder)
[docs]
def __call__(self, run_initial_ld, run_skew_fit, run_polynomial_fit, make_fit_plots,
draw_sim_num=500000):
"""
Call function for FitPSFPerturbations.
Parameters
----------
run_initial_ld : boolean
Flag for whether to run the initial fitting of perturbation at
various relative flux-distance combinations if data already exist.
run_skew_fit : boolean
Toggle for whether to always run the fitting of each relative flux's
perturbation as a function of perturber position with a skew normal.
run_polynomial_fit : boolean
Flag indicating whether to re-run fitting of skew normal parameters
as a function of relative perturber flux even if already run
previously.
make_fit_plots : boolean
Controls whether summary plot is created for parameterisation run.
draw_sim_num : integer, optional
Number of realisations to draw when plotting summary figure for
goodness-of-fit distributions.
"""
if self.plot_save_folder is None and not make_fit_plots:
raise ValueError("plot_save_folder cannot be None if plots are "
"to be made. Please specify a folder for plots "
"to be saved into.")
self.run_initial_ld = run_initial_ld
self.run_skew_fit = run_skew_fit
self.run_polynomial_fit = run_polynomial_fit
self.make_fit_plots = make_fit_plots
self.draw_sim_num = draw_sim_num
if self.run_initial_ld or not os.path.exists(f'{self.data_save_folder}/dd_ld.npy'):
self.make_initial_ld_perturbations()
if self.run_skew_fit or not os.path.exists(f'{self.data_save_folder}/dd_skew_pars.npy'):
self.fit_skew_distributions()
if self.run_polynomial_fit or not np.all(
[os.path.exists(f'{self.data_save_folder}/{q}.npy') for q in
['ns', 'dd_ic', 'dd_params', 'dd_params_full', 'l_cut']]):
self.fit_polynomial_parameterisations()
if self.make_fit_plots:
self.plot_fits()
[docs]
def make_initial_ld_perturbations(self):
"""
Derive full perturbations due to additional blended object within PSF
for a matrix of relative flux-positional offset combinations.
"""
# Save d and delta-d, derived "properly", for each l/d combo
dd = np.empty((len(self.li), len(self.di), 2), float)
ij = itertools.product(np.arange(len(self.li)), np.arange(len(self.di)))
iter_array = zip(ij, itertools.repeat([self.li, self.di, self.psf_sig]))
with make_pool(self.n_pool) as pool:
for return_values in pool.imap_unordered(
self.min_parallel_dd_fit, iter_array,
chunksize=int(len(self.li) * len(self.di)/self.n_pool)):
(i, j), res_xy = return_values
res = res_xy.x
dd[i, j, 0] = np.sqrt(res[0]**2 + res[1]**2)
dd[i, j, 1] = self.di[j]
pool.join()
np.save(f'{self.data_save_folder}/dd_ld.npy', dd)
[docs]
def min_parallel_dd_fit(self, iterable):
"""
Wrapper for minimisation routine used in fitting for positional
offset caused by fitting two blended sources with a single PSF model.
Parameters
----------
iterable : list
List of inputs from ``multiprocessing``, including indices into
perturber position and flux arrays, the perturber parameter
arrays, and the width of the Gaussian used in modelling the PSF.
Returns
-------
i : integer
The index into the relative flux array for this particular call.
j : integer
The perturber positional offset array index.
res : ~`scipy.optimize.OptimizeResult`
The `scipy` optimisation output.
"""
(i, j), (l, di, c) = iterable
l, d = l[i], di[j]
res = minimize(self.min_dd_fit_xy, x0=np.array([l, d * l / (1 + l), 0]),
args=(1, [d], [1e-4], [l], c),
jac=True, method='newton-cg', hess=self.hess_dd_fit_xy,
options={'xtol': 1e-15})
return (i, j), res
[docs]
def min_dd_fit_xy(self, p, l, xis, yis, lis, sig):
"""
Minimisation function for fitting one PSF model to two or more simulated
sources, including the first-order derivatives with respect to position
offset and flux brightening of the single PSF.
Parameters
----------
p : list
List of the current values of perturbation offset and
flux brightening caused by all perturbers hidden within the
central object's PSF.
l : float
Flux of the central object.
xis : numpy.ndarray
x-axis positions of all perturbers being fit with a single PSF.
yis : numpy.ndarray
Positions of perturbers in the opposing orthogonal axis.
lis : numpy.ndarray
Flux of perturbers relative to the central source flux ``l``.
sig : float
The Gaussian sigma of the PSFs being fit.
Returns
-------
numpy.ndarray
Negative log-likelihood and its derivative with respect to position
and flux offsets.
"""
dx, dy, dl = p
di_dd = np.array([xis - dx, yis - dy])
dd = np.array([dx, dy])
fx = ((l+dl) * np.sum(lis * self.psi(di_dd, sig)) +
l * (l+dl) * self.psi(dd, sig) - 0.5*(l+dl)**2)
dfx = np.array([(l+dl) * np.sum(lis * (xis - dx)/2/sig**2 * self.psi(di_dd, sig)) -
l * (l+dl) * dx/2/sig**2 * self.psi(dd, sig),
(l+dl) * np.sum(lis * (yis - dy)/2/sig**2 * self.psi(di_dd, sig)) -
l * (l+dl) * dy/2/sig**2 * self.psi(dd, sig),
-(l + dl) + np.sum(lis * self.psi(di_dd, sig)) + l * self.psi(dd, sig)])
return -1*fx, -1*dfx
[docs]
def hess_dd_fit_xy(self, p, l, xis, yis, lis, sig):
"""
Second-order derivatives of the fit of one PSF model to two or more
simulated sources, following the minimisation routine ``min_dd_fit_xy``.
Parameters
----------
p : list
List of the current values of perturbation offset and
flux brightening caused by all perturbers hidden within the
central object's PSF.
l : float
Flux of the central object.
xis : numpy.ndarray
x-axis positions of all perturbers being fit with a single PSF.
yis : numpy.ndarray
Positions of perturbers in the opposing orthogonal axis.
lis : numpy.ndarray
Flux of perturbers relative to the central source flux ``l``.
sig : float
The Gaussian sigma of the PSFs being fit.
Returns
-------
numpy.ndarray
Negative of the Hessian of the log-likelihood fit between one PSF
model and multiple injected blended sources.
"""
dx, dy, dl = p
di_dd = np.array([xis - dx, yis - dy])
dd = np.array([dx, dy])
dx2 = ((l+dl) * np.sum(lis * self.psi(di_dd, sig) * ((xis - dx)**2/2/sig**2 - 1)) / 2 /
sig**2 + l * (l+dl) * self.psi(dd, sig) * (dx**2 / 2 / sig**2 - 1) / 2 / sig**2)
dxdy = ((l+dl) * np.sum(lis * self.psi(di_dd, sig) * (xis - dx) * (yis - dy)/4/sig**4) +
l * (l+dl) * self.psi(dd, sig) * dx*dy / 4 / sig**4)
dy2 = ((l+dl) * np.sum(lis * self.psi(di_dd, sig) * ((yis - dy)**2/2/sig**2 - 1)) / 2 /
sig**2 + l * (l+dl) * self.psi(dd, sig) * (dy**2 / 2 / sig**2 - 1) / 2 / sig**2)
dxdl = np.sum(lis*(xis - dx)/2/sig**2 *
self.psi(di_dd, sig)) - l * dx/2/sig**2 * self.psi(dd, sig)
dydl = np.sum(lis*(yis - dy)/2/sig**2 *
self.psi(di_dd, sig)) - l * dy/2/sig**2 * self.psi(dd, sig)
dl2 = -1
return -1*np.array([[dx2, dxdy, dxdl], [dxdy, dy2, dydl], [dxdl, dydl, dl2]])
[docs]
def psi(self, x, sig):
r"""
Calculate Psi, the convolution of two PSFs :math:`\phi` (equation 2,
Plewa & Sari, 2018, MNRAS, 476, 4372).
Parameters
----------
x : numpy.ndarray
Separation at which to evaluate the phi convolution. The first
axis should contain the cartesian x- and y-axis positions, with
each unique object filling the subsequent axes.
sig : float
Gaussian sigma of the PSFs being convolved.
Returns
-------
numpy.ndarray
The convolution of two PSFs :math:`\phi` evaluated at `x`.
"""
return np.exp(-0.25 * np.sum(x**2, axis=0) / sig**2)
[docs]
def fit_skew_distributions(self):
"""
Function for fitting sets of position offsets as a function of perturber
position with skew-normal distributions, accounting for a break at small
offsets where relationship becomes linear, for each value of perturber
relative flux.
"""
dd = np.load(f'{self.data_save_folder}/dd_ld.npy')
dd_skew_pars = np.empty((len(self.li), 5), float)
# Fit each l distribution for its parameters as a function of d.
for i, li in enumerate(self.li):
_x, _y = dd[i, 1:, 1] / self.psf_sig, dd[i, 1:, 0] / self.psf_sig
slices = np.where(np.abs((_y - _x * li/(1 + li))/_y) > 0.01)[0]
if len(slices) == 0:
dd_skew_pars[i, :-1] = 0
dd_skew_pars[i, -1] = _x[-1] + 0.1
print(r'All derived perturbations within 1% of linear fit for '
rf'relative flux of {li[i]}. No skew-normal fit needed.')
continue
cutr = _x[slices[0]]
x = _x[slices[0]:]
y = _y[slices[0]:]
w = np.ones_like(x)
w[:10] = 100
n_pools = self.n_pool
n_overloop = 2
niters = 150
counter = np.arange(0, n_pools*n_overloop)
xy_step = 0.1
temp = 0.01
x0 = None
method = 'L-BFGS-B'
min_kwarg = {'method': method, 'args': (x, y, w, self.li[i]), 'jac': True}
iter_rep = itertools.repeat([min_kwarg, niters, x0, xy_step, temp])
iter_group = zip(counter, iter_rep)
res = None
min_val = None
with make_pool(n_pools) as pool:
for return_res in pool.imap_unordered(self.dd_fitting_wrapper, iter_group,
chunksize=n_overloop):
if min_val is None or return_res.fun < min_val:
res = return_res
min_val = return_res.fun
pool.join()
dd_skew_pars[i, :-1] = res.x
dd_skew_pars[i, -1] = cutr
# With degeneracy between T/sig/alpha, we need to ensure values are of
# the correct sign. T and sig should always be positive, so if negative
# their sign needs correcting. However, if sig is negative then alpha will
# also be of the wrong sign, to correct for a * (x - u) / c within the
# skew-normal CDF. Thus, if either T or sig is negative, correct all
# three values.
q = (dd_skew_pars[:, 3] < 0) | (dd_skew_pars[:, 0] < 0)
dd_skew_pars[q, 0] *= -1
dd_skew_pars[q, 2] *= -1
dd_skew_pars[q, 3] *= -1
np.save(f'{self.data_save_folder}/dd_skew_pars.npy', dd_skew_pars)
[docs]
def dd_fitting_wrapper(self, iterable):
"""
Wrapper for fitting the parameters for the skew-normal distribution
to a position offset-perturber position relationship of a particular
flux value of perturber, via basinhopping.
Parameters
----------
iterable : list
List of values passed through ``multiprocessing``, including the
index into the relative flux array, the keywords passed through
to `basinhopping` (containing the method used, the arguments to
pass to `basinhopping`, and things like whether to compute the
Jacobian), the number of basin hop iterations, starting argument
values, and basin hop step size and "temperature".
Returns
-------
res : ~`scipy.optimize.OptimizeResult`
The `scipy` optimisation output containing the best-fit
skew-normal parameters from all basins.
"""
def sum_one_skew(p, x, y, w, l):
"""
Calculate the weighted sum-of-square-residuals between the
skew-normal and input data values, as well as its derivative
with respect to the parameters of the skew-normal.
Parameters
----------
p : list
Input values of Gaussian sigma, mean, skewness, and amplitude
for this particular skew-normal.
x : numpy.ndarray
Perturber distance from central source.
y : numpy.ndarray
Perturbation offsets, as calculated from full model residual
fitting.
w : numpy.ndarray
Weights for each data point.
l : numpy.ndarray
The relative flux of the perturber.
Returns
-------
numpy.ndarray
Weighted sum of data-model fits and derivatives of the
goodness-of-fit parameter.
"""
f, df = self.fit_one_skew(p, x, l)
return np.sum((y - f)**2 * w), np.array([np.sum(-2 * w * (y - f) * q) for q in df])
rng = np.random.default_rng()
_, (min_kwarg, niters, x0, s, t) = iterable
if x0 is None:
# sigma, mu, alpha, T
x0 = [rng.uniform(0, 3), rng.uniform(0, 4), rng.uniform(-1, 1), rng.uniform(0, 1)]
res = basinhopping(sum_one_skew, x0, minimizer_kwargs=min_kwarg, niter=niters, T=t,
stepsize=s)
return res
[docs]
def fit_one_skew(self, p, x, l):
"""
Function used in the fitting of a skew-normal distribution, calculating
the value of the skew-normal and its derivative with respect to its
parameters at the given value.
Parameters
----------
p : list
List of skew-normal parameters, the Gaussian sigma, mean, skewness
and amplitude respectively.
x : numpy.ndarray
Input values at which to evaluate the skew-normal distribution.
l : float
Relative flux of perturber for which perturbation-position relations
are being fit with the skew-normal distribution.
Returns
-------
f : numpy.ndarray
The skew-normal distribution evaluated at all ``x``.
df : numpy.ndarray
The derivative of `f` with respect to its Gaussian sigma, mean,
skewness, and amplitude respectively.
"""
def calc_psi_pdf(x):
r"""
Probability density function :math:`\psi` of the standard normal
distribution, used in calculating the skew-normal distribution.
Parameters
----------
x : numpy.ndarray
Values at which to evaluate the standard normal distribution.
Returns
-------
numpy.ndarray
:math:`\psi` at every `x` value.
"""
return 1/np.sqrt(2 * np.pi) * np.exp(-0.5 * x**2)
def calc_psi_cdf(x):
r"""
Cumulative distribution function, :math:`\psi` integrated from
:math:`-\infty` to `x`, of the standard normal distribution, used
in calculating the skew-normal distribution.
Parameters
----------
x : numpy.ndarray
Values at which to evaluate the CDF of the standard normal
distribution.
Returns
-------
numpy.ndarray
:math:`\psi` integrated from :math:`-\infty` to `x`, at every
`x`.
"""
return 0.5 * (1 + erf(x / np.sqrt(2)))
c, u, a, t = p
x_ = (x - u) / c
psi_pdf = calc_psi_pdf(x_)
psi_pdf_skew = calc_psi_pdf(a * x_)
psi_cdf_skew = calc_psi_cdf(a * x_)
dx_dc = -(x - u) / c**2
dx_du = -1 / c
f = 2 * t / c * l * psi_pdf * psi_cdf_skew
dfdc = (2 * t / c * l * psi_pdf * (dx_dc * (-x_ * psi_cdf_skew +
psi_pdf_skew * a) - psi_cdf_skew/c))
dfdu = 2 * t / c * l * psi_pdf * dx_du * (-psi_cdf_skew * x_ + a * psi_pdf_skew)
dfda = 2 * t / c * l * psi_pdf * psi_pdf_skew * x_
dfdt = 2 / c * l * psi_pdf * psi_cdf_skew
df = np.array([dfdc, dfdu, dfda, dfdt])
return f, df
[docs]
def fit_polynomial_parameterisations(self):
"""
Function controlling the derivation of power law polynomial fits to
skew-normal distribution parameters as a function of the relative flux
of the perturber.
"""
dd = np.load(f'{self.data_save_folder}/dd_ld.npy')
dd_skew_pars = np.load(f'{self.data_save_folder}/dd_skew_pars.npy')
l_cut = [0.15, self.li[self.li < 0.75][np.argmin(dd_skew_pars[self.li < 0.75, 0])],
self.li[np.argmin(dd_skew_pars[:, 2])]]
ns = np.arange(4, 25)
dd_params = np.empty((len(ns), dd_skew_pars.shape[1], np.amax(ns), 2), float)
for k, q in enumerate([self.li <= l_cut[1], (self.li > l_cut[1]) & (self.li < l_cut[2])]):
ij = np.arange(len(ns))
iter_array = zip(ij, itertools.repeat([ns, dd_skew_pars[q, :], self.li[q],
self.di[-1]/self.psf_sig]))
with make_pool(self.n_pool) as pool:
for results in pool.imap_unordered(self.min_parallel_dd_param_fit, iter_array,
chunksize=max(1, int(len(ns)/self.n_pool))):
j, n, resses = results
for i, res in enumerate(resses):
dd_params[j, i, :n, k] = res
pool.join()
# Keep track of the total goodness-of-fit values across all di-li
# combinations, as well as the goodness-of-fits just for individual
# lis, across di.
dd_ic = np.zeros((len(ns), 2), float)
dd_x2s = np.empty((len(ns), len(self.li), 2), float)
for j, n in enumerate(ns):
dd_x2 = [0, 0]
for i, li in enumerate(self.li):
x = dd[i, :, 1] / self.psf_sig
y = dd[i, :, 0] / self.psf_sig
ddparams = self.return_ddparams(li, l_cut, dd_params, n, j)
q = (~np.isnan(x)) & (~np.isnan(y))
x2_w = np.sum((y[q] - self.dd_combined_fit(ddparams, x[q], li,
l_cut[2]))**2 / y[q]**2)
x2_nw = np.sum((y[q] - self.dd_combined_fit(ddparams, x[q], li, l_cut[2]))**2)
dd_x2[0] += x2_w
dd_x2[1] += x2_nw
dd_x2s[j, i, :] = [x2_w, x2_nw]
dd_ic[j, :] = dd_x2
np.save(f'{self.data_save_folder}/ns.npy', ns)
np.save(f'{self.data_save_folder}/dd_ic.npy', dd_ic)
np.save(f'{self.data_save_folder}/dd_x2s.npy', dd_x2s)
np.save(f'{self.data_save_folder}/dd_params_full.npy', dd_params)
np.save(f'{self.data_save_folder}/l_cut.npy', l_cut)
# Use the relative sum-of-square-residuals as the metric for deciding
# which polynomial order is the best, but with an AIC complexity factor.
n_ind = np.unravel_index(np.argmin(dd_ic[:, 0] + 2*ns), dd_ic[:, 1].shape)[0]
dd_params_final = dd_params[n_ind, :, :ns[n_ind]]
np.save(f'{self.data_save_folder}/dd_params.npy', dd_params_final)
[docs]
def min_parallel_dd_param_fit(self, iterable):
"""
Wrapper function for calculating the best-fit polynomial parameters
for describing skew-normal distribution parameters as a function of
relative perturber flux.
Parameters
----------
iterable : list
The parameters passed through ``multiprocessing``, including
index of polynomial order, the list of polynomial orders being
fit across all parallel threads, the set of skew-normal
distribution parameters at all fluxes, the fluxes evaluated for
perturbation offsets, and the maximum perturber position.
Returns
-------
j : integer
The index into the polynomial order array.
n : integer
The polynomial order being fit.
resses : list
A list containing the best-fit polynomial weights for each
parameters in the skew-normal distributuon (Gaussian sigma, mean,
skewness, amplitude) and linear-regime radius cutoff.
"""
def sum_poly(p, x, y):
"""
Computes the sum-of-squares goodness-of-fit of a polynomial to
some data, and returns it and its first-order derivative with
respect to the polynomial parameters.
Parameters
----------
p : list
The polynomial weights.
x : numpy.ndarray
Values at which to evaluate the polynomial
y : numpy.ndarray
The data points, each of which corresponds to an element in
``x``.
Returns
-------
numpy.ndarray
The sum-of-squares between the model and data `y`, as well as
the derivative of the sum-of-squares with respect to each
parameter in the list `p`.
"""
f, df = self.fit_poly(p, x)
return np.sum((y - f)**2), np.array([np.sum(-2 * (y - f) * q) for q in df])
j, (ns, dd_skew_pars, li, dcut) = iterable
n = ns[j]
resses = []
for i in range(dd_skew_pars.shape[1]):
# Filter for any "non-fit" fluxes, which are indicated by having
# a linear-regime cutoff radius larger than the maximum extent
# of dd probed.
q = dd_skew_pars[:, -1] < dcut
resses.append(minimize(sum_poly, x0=[0.5]*n,
args=(li[q], dd_skew_pars[q, i]),
jac=True, method='L-BFGS-B', options={'ftol': 1e-12}).x)
return j, n, resses
[docs]
def fit_poly(self, p, x):
r"""
Wrapper function for evaluating a polynomial with weights `p` at
all `x`, :math:`y_j = \sum_{i=0}^N p_i x_j^i`, and its derivative with
respect to each parameter `p`.
Parameters
----------
p : list or numpy.ndarray
The weights of the polynomial.
x : numpy.ndarray
Values at which to calculate the polynomial.
Returns
-------
y : numpy.ndarray
The evaluation of the polynomial with parameters `p` at
each `x`.
dy : list
The derivative of `y` with respect to each `p_i` parameter.
"""
x = np.atleast_1d(x)
y = np.empty((len(p), len(x)), float)
# y = sum_i=0^N p_i x**i
y[0, :] = p[0]
for i in np.arange(1, len(p)):
y[i, :] = y[i-1, :] * x / p[i-1] * p[i]
y = np.sum(y, axis=0)
dy = [np.ones_like(x)]
for _ in np.arange(1, len(p)):
dy.append(dy[-1] * x)
return y, dy
[docs]
def return_ddparams(self, li, l_cut, dd_params, n, n_ind):
"""
Convenience function for deriving skew-normal distribution parameters
as a function of perturber flux from fit polynomial distributions.
Parameters
----------
li : float
The flux of the perturber relative to the flux of the central source.
l_cut : list or numpy.ndarray
List of key cut-off relative fluxes, at which parameterisations of
perturber effects change.
dd_params : numpy.ndarray
Array of polynomial weights for each skew-normal distribution
parameter, for multiple polynomial orders, for parameterisations
above and below ``l_cut[1]``.
n : integer
Number of polynomial terms being used to calculate the skew-normal
distribution values.
n_ind : integer
Index of the chosen number of polynomial terms in ``dd_params``.
Returns
-------
dd_skew_params : numpy.ndarray
Values of the skew-normal sigma, mean, skewness, and amplitude,
evaluated at `li`, from a polynomial of order `N+1`.
"""
dd_skew_params = []
p = 0 if li <= l_cut[1] else 1
for q in range(dd_params.shape[1]):
dd_skew_params.append(self.fit_poly(dd_params[n_ind, q, :n, p], li)[0])
dd_skew_params = np.array(dd_skew_params)
return dd_skew_params
[docs]
def dd_combined_fit(self, p, x, l, l_cut):
"""
Wrapper function for calculating the combined description of the
perturber offsets as a linear relation up to a particular offset
and a skew-normal distribution beyond that.
Parameters
----------
p : list
The values of the skew-normal distribution (sigma, mean, skewness,
amplitude), as well as the cutoff radius inside which to follow
a linear relation.
x : numpy.ndarray
Values at which to evaluate the offset due to a blended perturber.
l : float
Relative flux of the perturber as compared with the central object.
l_cut : float
Cutoff relative flux, above which no skew-normal distribution is
used at all.
Returns
-------
y : numpy.ndarray
The composite perturbation function, linear within ``p[-1]`` and
a skew-normal distribution outside.
"""
if l >= l_cut:
return x * l / (l + 1)
y = np.empty_like(x)
q = np.where(x <= p[-1])
y[q] = x[q] * l / (l + 1)
q = np.where(x > p[-1])
y[q] = self.fit_one_skew(p[:-1], x[q], l)[0]
return y
[docs]
def plot_fits(self): # pylint: disable=too-many-statements
"""
Visualisation function, plotting the various parameterisations fit in
FitPSFPerturbations, enabling quality checks to be carried out.
"""
dd = np.load(f'{self.data_save_folder}/dd_ld.npy')
dd_skew_pars = np.load(f'{self.data_save_folder}/dd_skew_pars.npy')
ns = np.load(f'{self.data_save_folder}/ns.npy')
dd_ic = np.load(f'{self.data_save_folder}/dd_ic.npy')
dd_x2s = np.load(f'{self.data_save_folder}/dd_x2s.npy')
l_cut = np.load(f'{self.data_save_folder}/l_cut.npy')
dd_params_full = np.load(f'{self.data_save_folder}/dd_params_full.npy')
n_ind = np.unravel_index(np.argmin(dd_ic[:, 0] + 2*ns), dd_ic[:, 1].shape)[0]
plt.figure('figure', figsize=(40, 16))
gs = gridspec.GridSpec(2, 4)
norm = Normalize(vmin=self.li[0], vmax=self.li[len(self.li)-1])
ax = plt.subplot(gs[0, 0])
# Work backwards from maximum li to minimum in 0.05 li steps.
for i in range(len(self.li)-1, -1, -int(np.ceil(0.05/(self.li[1] - self.li[0])))):
x = dd[i, :, 1] / self.psf_sig
y = dd[i, :, 0] / self.psf_sig
q = (~np.isnan(x)) & (~np.isnan(y))
x, y = x[q], y[q]
ax.plot(x, y, ls='-', c=cm.viridis(norm(self.li[i]))) # pylint: disable=no-member
ddparams = self.return_ddparams(self.li[i], l_cut, dd_params_full,
ns[n_ind], n_ind)
ax.plot(x, self.dd_combined_fit(ddparams, x, self.li[i], l_cut[2]), ls='--',
c=cm.viridis(norm(self.li[i]))) # pylint: disable=no-member
if usetex:
ax.set_xlabel(r'$x / \sigma_\mathrm{PSF}$')
ax.set_ylabel(r'$\Delta x / \sigma_\mathrm{PSF}$')
else:
ax.set_xlabel(r'x / sigma_PSF')
ax.set_ylabel(r'Delta x / sigma_PSF')
ax = plt.subplot(gs[0, 1])
if usetex:
label_list = [r'$\sigma$', r'$\mu$', r'$\alpha$', r'$T$', r'$r_\mathrm{c}$']
else:
label_list = [r'sigma', r'mu', r'alpha', r'T', r'r_c']
for i, (label, c, c2) in enumerate(zip(label_list, ['k', 'r', 'b', 'g', 'purple'],
['gray', 'orange', 'aquamarine', 'olive', 'violet'])):
ax.plot(self.li, dd_skew_pars[:, i], ls='-', c=c, label=label)
q = self.li <= l_cut[1]
ax.plot(self.li[q], self.fit_poly(dd_params_full[n_ind, i, :ns[n_ind], 0],
self.li[q])[0], ls='--', c=c2, lw=4)
q = (self.li > l_cut[1]) & (self.li < l_cut[2])
ax.plot(self.li[q], self.fit_poly(dd_params_full[n_ind, i, :ns[n_ind], 1],
self.li[q])[0], ls='-.', c=c2, lw=4)
ax.legend(fontsize=14, ncol=2)
ax.set_xlabel('Relative perturber flux')
ax.set_ylabel('Perturbation terms')
ax = plt.subplot(gs[0, 2])
ax1 = ax.twinx()
ax.plot(ns, np.log10(dd_ic[:, 1]), ls='-', c='k')
ax.axvline(ns[n_ind], ls='-', c='k')
ax1.plot(ns, np.log10(dd_ic[:, 0]), ls='-', c='r')
ax.set_xlabel('Order of polynomial fit')
if usetex:
ax.set_ylabel(r'$\mathrm{log}_{10}(\sum_i (y_i - f(x_i))^2)$')
ax1.set_ylabel(r'$\mathrm{log}_{10}(\sum_i \frac{(y_i - f(x_i))^2}{y_i^2})$', c='r')
else:
ax.set_ylabel(r'log10(sum_i (y_i - f(x_i))**2)')
ax1.set_ylabel(r'log10(sum_i (y_i - f(x_i))**2 / y_i**2)', c='r')
ax = plt.subplot(gs[1, 0])
ax1 = ax.twinx()
ax.plot(self.li, np.log10(dd_x2s[n_ind, :, 1]), ls='-', c='k')
ax.plot(self.li, np.log10(dd_x2s[n_ind, :, 0]), ls='-', c='r')
ax.set_xlabel('Relative perturber flux')
if usetex:
ax.set_ylabel(r'$\mathrm{log}_{10}(\sum_i (y_i - f(x_i))^2)$')
ax1.set_ylabel(r'$\mathrm{log}_{10}(\sum_i \frac{(y_i - f(x_i))^2}{y_i^2})$', c='r')
else:
ax.set_ylabel(r'log10(sum_i (y_i - f(x_i))**2)')
ax1.set_ylabel(r'log10(sum_i (y_i - f(x_i))**2 / y_i**2)', c='r')
ax = plt.subplot(gs[1, 1])
max_perc_diff = np.empty_like(self.li)
for i, li in enumerate(self.li):
x = dd[i, 1:, 1] / self.psf_sig
y = dd[i, 1:, 0] / self.psf_sig
q = (~np.isnan(x)) & (~np.isnan(y))
x, y = x[q], y[q]
ddparams = self.return_ddparams(li, l_cut, dd_params_full, ns[n_ind], n_ind)
max_perc_diff[i] = np.amax(np.abs((self.dd_combined_fit(ddparams, x, li,
l_cut[2]) - y) / (y + 1e-5)))
ax.plot(self.li, max_perc_diff, ls='-', c='k')
ax.set_xlabel('Relative perturber flux')
if usetex:
ax.set_ylabel(r'Max $\lvert\frac{\Delta x_\mathrm{f} / \sigma_\mathrm{PSF} - '
r'\Delta x_\mathrm{t} / \sigma_\mathrm{PSF}}{\Delta x_\mathrm{t} / '
r'\sigma_\mathrm{PSF}}\lvert$')
else:
ax.set_ylabel(r'Max abs((Delta x_f / sigma_PSF - Delta x_t / sigma_PSF) / '
r'(Delta x_t / sigma_PSF))')
ax = plt.subplot(gs[1, 2])
max_abs_diff = np.empty_like(self.li)
for i, li in enumerate(self.li):
x = dd[i, 1:, 1] / self.psf_sig
y = dd[i, 1:, 0] / self.psf_sig
q = (~np.isnan(x)) & (~np.isnan(y))
x, y = x[q], y[q]
ddparams = self.return_ddparams(li, l_cut, dd_params_full, ns[n_ind], n_ind)
max_abs_diff[i] = np.amax(np.abs(self.dd_combined_fit(ddparams, x, li, l_cut[2]) - y))
ax.plot(self.li, max_abs_diff, ls='-', c='k')
ax.set_xlabel('Relative perturber flux')
if usetex:
ax.set_ylabel(r'Max $\lvert\Delta x_\mathrm{f} / \sigma_\mathrm{PSF} - '
r'\Delta x_\mathrm{t} / \sigma_\mathrm{PSF}\lvert$')
else:
ax.set_ylabel(r'Max abs(Delta x_f / sigma_PSF - Delta x_t / sigma_PSF)')
diff = np.empty((self.draw_sim_num, 6), float)
ij = np.arange(diff.shape[0])
iter_array = zip(ij, itertools.repeat([self.psf_fwhm, self.psf_sig, l_cut,
dd_params_full, ns[n_ind], n_ind]))
with make_pool(self.n_pool) as pool:
for results in pool.imap_unordered(self.loop_ind_fit, iter_array,
chunksize=int(diff.shape[0]/self.n_pool)):
i, dx, _li, x, ddparams = results
dx_fit = self.dd_combined_fit(ddparams, np.array([x]), _li, l_cut[2])
diff[i, 0] = np.amax(np.abs((dx_fit - dx)/(dx + 1e-3)))
diff[i, 1] = np.amax(np.abs(dx_fit - dx))
diff[i, 2] = _li
diff[i, 3] = x
diff[i, 4] = dx
diff[i, 5] = dx_fit[0]
pool.join()
ax = plt.subplot(gs[0, 3])
hist, bins = np.histogram(diff[:, 0], bins='auto')
ax.plot(bins, np.append(hist, 0), 'k-')
if usetex:
ax.set_xlabel(r'$\lvert\frac{\Delta x_\mathrm{f} / \sigma_\mathrm{PSF} - '
r'\Delta x_\mathrm{t} / \sigma_\mathrm{PSF}}{\Delta x_\mathrm{t} / '
r'\sigma_\mathrm{PSF}}\lvert$')
else:
ax.set_xlabel(r'Max abs((Delta x_f / sigma_PSF - Delta x_t / sigma_PSF) / '
r'(Delta x_t / sigma_PSF))')
ax.set_ylabel('N')
ax = plt.subplot(gs[1, 3])
hist, bins = np.histogram(diff[:, 1], bins='auto')
ax.plot(bins, np.append(hist, 0), 'k-')
if usetex:
ax.set_xlabel(r'$\lvert\Delta x_\mathrm{f} / \sigma_\mathrm{PSF} - '
r'\Delta x_\mathrm{t} / \sigma_\mathrm{PSF}\lvert$')
else:
ax.set_xlabel(r'Max abs(Delta x_f / sigma_PSF - Delta x_t / sigma_PSF)')
ax.set_ylabel('N')
plt.tight_layout()
plt.savefig(f'{self.plot_save_folder}/dd_params_visualisation.pdf')
[docs]
def loop_ind_fit(self, iterable):
"""
Wrapper function for fitting perturbation offset and flux brightening
of one PSF model fit to multiple unresolved objects, for arbitrary
relative flux and position.
Parameters
----------
iterable : list
List passed through ``multiprocessing``, containing the index of
the fit, as well as PSF FWHM and Gaussian sigma, maximum relative
flux at which skew-normal distributions still describe
perturbation offsets, the array of skew-normal parameters as a
function of relative flux, the polynomial order being fit, and
an index into the polynomial order array.
Returns
-------
i : integer
The index of the fit, looping through the number of simulations
to draw.
dx : float
The derived perturbation offset, normalised to the PSF sigma.
li : float
The randomly drawn relative flux of the perturbing object.
x : float
Position of the perturbing object relative to the central source.
ddparams : numpy.ndarray
The calculated skew-normal distribution parameters evaluated at
the simulated `li` for the chosen polynomial order.
"""
rng = np.random.default_rng()
i, (psf_fwhm, psf_sig, t_cut, dd_params, n, n_ind) = iterable
li = rng.uniform(0.15, 1)
x_ = 2 * psf_fwhm
while x_ > 1.185*psf_fwhm:
x_ = rng.rayleigh(scale=psf_sig)
res = minimize(self.min_dd_fit_xy, x0=np.array([li, x_ * li / (1 + li), 0]),
args=(1, [x_], [1e-4], [li], psf_sig),
jac=True, method='newton-cg', hess=self.hess_dd_fit_xy,
options={'xtol': 1e-15})
dx = np.sqrt(res.x[0]**2 + res.x[1]**2) / psf_sig
x = x_ / psf_sig
ddparams = self.return_ddparams(li, t_cut, dd_params, n, n_ind)
return i, dx, li, x, ddparams