Source code for macauff.fit_astrometry

# Licensed under a 3-clause BSD style license - see LICENSE
'''
Module for calculating corrections to astrometric uncertainties of photometric catalogues, by
fitting their AUFs and centroid uncertainties across ensembles of matches between well-understood
catalogue and one for which precisions are less well known.
'''
# pylint: disable=too-many-lines
# pylint: disable=duplicate-code

import datetime
import os
import shutil
import sys
import warnings

import matplotlib.pyplot as plt
import numpy as np
from astropy.coordinates import SkyCoord, match_coordinates_sky
from matplotlib import gridspec
from numpy.lib.format import open_memmap
from scipy.optimize import minimize
from scipy.special import factorial
from scipy.stats import binned_statistic, poisson

# Assume that usetex = False only applies for tests where no TeX is installed
# at all, instead of users having half-installed TeX, dvipng et al. somewhere.
usetex = not not shutil.which("tex")  # pylint: disable=unneeded-not
if usetex:
    plt.rcParams.update({"text.usetex": True, "text.latex.preamble": r"\usepackage{amsmath}"})

# pylint: disable=wrong-import-position,import-error,no-name-in-module
from macauff.galaxy_counts import create_galaxy_counts
from macauff.misc_functions import (
    convex_hull_area,
    create_densities,
    find_model_counts_corrections,
    generate_avs_inside_hull,
)
from macauff.misc_functions_fortran import misc_functions_fortran as mff
from macauff.parse_catalogue import apply_proper_motion
from macauff.perturbation_auf import (
    _calculate_magnitude_offsets,
    download_trilegal_simulation,
    make_tri_counts,
)
from macauff.perturbation_auf_fortran import perturbation_auf_fortran as paf

# pylint: enable=wrong-import-position,import-error,no-name-in-module

__all__ = ['AstrometricCorrections']


warnings.filterwarnings("ignore", message="ERFA function .* yielded")


def derive_astrometric_corrections(self, which):
    """
    Wrapper to set various parameters and call AstrometricCorrections,
    for either catalogue "a" or "b".

    Parameters
    ----------
    which : string
        Either 'a' or 'b', indicating which side catalogue to fit.
    """
    # Generate from current data: just need the singular chunk mid-points
    # and to leave all other parameters as they are.
    if len(getattr(self, f'{which}_auf_region_points')) > 1:
        warnings.warn(f"{which}_auf_region_points contains more than one AUF sampling point, but "
                      f"{which}_correct_astrometry is True. Check results carefully.")
    ax1_mids = np.array([getattr(self, f'{which}_auf_region_points')[0, 0]])
    ax2_mids = np.array([getattr(self, f'{which}_auf_region_points')[0, 1]])
    ax_dimension = 2
    a_npy_or_csv = 'csv'
    a_coord_or_chunk = 'chunk'
    t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"{t} Rank {self.rank}, chunk {self.chunk_id}: Calculating catalogue '{which}' "
          "uncertainty corrections...")
    apply_pm = (getattr(self, f'{which}_apply_proper_motion') or
                getattr(self, f'{which}_ref_apply_proper_motion'))
    if apply_pm:
        pm_cols, pm_ref_epoch_or_index = [None, None], [None, None]
        pm_move_to_epoch = self.move_to_epoch
        if getattr(self, f'{which}_apply_proper_motion'):
            pm_cols[0] = getattr(self, f'{which}_pm_indices')
            pm_ref_epoch_or_index[0] = getattr(self, f'{which}_ref_epoch_or_index')
        if getattr(self, f'{which}_ref_apply_proper_motion'):
            pm_cols[1] = getattr(self, f'{which}_ref_pm_indices')
            pm_ref_epoch_or_index[1] = getattr(self, f'{which}_ref_ref_epoch_or_index')
    else:
        pm_cols, pm_ref_epoch_or_index, pm_move_to_epoch = None, None, None
    ac = AstrometricCorrections(
        getattr(self, f'{which}_psf_fwhms'), self.num_trials, getattr(self, f'{which}_nn_radius'),
        getattr(self, f'{which}_dens_dist'), getattr(self, f'{which}_correct_astro_save_folder'),
        getattr(self, f'{which}_gal_wavs'), getattr(self, f'{which}_gal_aboffsets'),
        getattr(self, f'{which}_gal_filternames'), getattr(self, f'{which}_gal_al_avs'), self.d_mag,
        getattr(self, 'dd_params'), getattr(self, 'l_cut'), ax1_mids, ax2_mids,
        ax_dimension, getattr(self, f'{which}_correct_mag_array'),
        getattr(self, f'{which}_correct_mag_slice'), getattr(self, f'{which}_correct_sig_slice'), self.n_pool,
        a_npy_or_csv, a_coord_or_chunk, getattr(self, f'{which}_pos_and_err_indices'),
        getattr(self, f'{which}_mag_indices'), getattr(self, f'{which}_snr_indices'),
        getattr(self, f'{which}_filt_names'), getattr(self, f'{which}_correct_astro_mag_indices_index'),
        getattr(self, f'{which}_auf_region_frame'), getattr(self, f'{which}_saturation_magnitudes'),
        trifilepath=getattr(self, f'{which}_auf_file_path'),
        maglim_f=getattr(self, f'{which}_tri_maglim_faint'), magnum=getattr(self, f'{which}_tri_filt_num'),
        tri_num_faint=getattr(self, f'{which}_tri_num_faint'),
        trifilterset=getattr(self, f'{which}_tri_set_name'),
        trifiltnames=getattr(self, f'{which}_tri_filt_names'),
        tri_dens_cube=getattr(self, f'{which}_tri_dens_cube'),
        tri_dens_array=getattr(self, f'{which}_tri_dens_array'),
        use_photometric_uncertainties=getattr(self, f'{which}_use_photometric_uncertainties'),
        pregenerate_cutouts=True, chunks=[self.chunk_id], n_r=self.real_hankel_points,
        n_rho=self.four_hankel_points, max_rho=self.four_max_rho,
        mn_fit_type=getattr(self, f'{which}_mn_fit_type'), apply_proper_motion_flag=apply_pm,
        pm_indices=pm_cols, pm_ref_epoch_or_index=pm_ref_epoch_or_index, pm_move_to_epoch=pm_move_to_epoch)
    ac(a_cat_name=getattr(self, f'{which}_ref_cat_csv_file_path'),
       b_cat_name=getattr(self, f'{which}_cat_csv_file_path'),
       tri_download=getattr(self, f'{which}_download_tri'), make_plots=True, overwrite_all_sightlines=True,
       seeing_ranges=getattr(self, f'{which}_seeing_ranges'))


# pylint: disable=too-many-instance-attributes,too-many-statements,too-many-locals
# pylint: disable=too-many-arguments,too-many-positional-arguments
[docs] class AstrometricCorrections: """ Class to calculate any potential corrections to quoted astrometric precisions in photometric catalogues, based on reliable cross-matching to a well-understood second dataset. """ def __init__(self, psf_fwhms, numtrials, nn_radius, dens_search_radius, save_folder, gal_wavs_micron, gal_ab_offsets, gal_filtnames, gal_alavs, dm, dd_params, l_cut, ax1_mids, ax2_mids, ax_dimension, mag_arrays, mag_slices, sig_slices, n_pool, npy_or_csv, coord_or_chunk, pos_and_err_indices, mag_indices, snr_indices, mag_names, correct_astro_mag_indices_index, coord_system, saturation_magnitudes, pregenerate_cutouts, n_r, n_rho, max_rho, mn_fit_type, trifilepath=None, maglim_f=None, magnum=None, tri_num_faint=None, trifilterset=None, trifiltnames=None, tri_dens_cube=None, tri_dens_array=None, use_photometric_uncertainties=False, cutout_area=None, cutout_height=None, single_sided_auf=True, chunks=None, return_nm=False, apply_proper_motion_flag=False, pm_indices=None, pm_ref_epoch_or_index=None, pm_move_to_epoch=None): """ Initialisation of AstrometricCorrections, accepting inputs required for the running of the optimisation and parameterisation of astrometry of a photometric catalogue, benchmarked against a catalogue of much higher astrometric resolution and precision, such as Gaia or the Hubble Source Catalog. Parameters ---------- psf_fwhms : list or numpy.ndarray of floats The full-widths at half-maximum of the Point Spread Functions, used to determine the sizes of the PSF for perturber placement purposes. numtrials : integer Number of simulations to run when deriving pertubation statistics. nn_radius : float Size of nearest-neighbour search for construction of intermediate cross-match distributions, in arcseconds. dens_search_radius : float Radius out to which to search around objects internal to a catalogue, to determine the local normalising density for each source, in degrees. save_folder : string Absolute or relative filepath of folder into which to store temporary and generated outputs from the fitting process. gal_wavs_micron : list or numpy.ndarray of floats Wavelength, in microns, of the chosen filters, for use in simulating galaxy counts. gal_ab_offsets : list or numpy.ndarray of floats The offsets between the filter zero points and the AB magnitude offset. gal_filtnames : list or numpy.ndarray of strings Names of the filter in the ``speclite`` compound naming convention. gal_alavs : list or numpy.ndarray of floats Differential reddening vector of the given filters. dm : float Bin spacing for magnitude histograms of TRILEGAL simulations. dd_params : numpy.ndarray Array, of shape ``(5, X, 2)``, containing the parameterisation of the skew-normal used to construct background-dominated PSF perturbations. l_cut : numpy.ndarray Array of shape ``(3,)`` containing the cuts between the different regimes of background-dominated PSF perturbation. ax1_mids : numpy.ndarray Array of longitudes (e.g. RA or l) used to center regions used to determine astrometric corrections across the sky. Depending on ``ax_correction``, either the unique values that with ``ax2_mids`` form a rectangle, or a unique ax-ax combination with a corresponding ``ax2_mid``. ax2_mids : numpy.ndarray Array of latitudes (Dec/b) defining regions for calculating astrometric corrections. Either unique rectangle values to combine with ``ax1_mids`` or unique ``ax1_mids``-``ax2_mids`` pairs, one per entry. ax_dimension : integer, either ``1`` or ``2`` If ``1`` then ``ax1_mids`` and ``ax2_mids`` form unique sides of a rectangle when combined in a grid, or if ``2`` each ``ax1_mids``-``ax2_mids`` combination is a unique ax-ax pairing used as given. mag_arrays : list of numpy.ndarrays or numpy.ndarray List of lists of magnitudes in the filter to derive astrometry for, with each element of ``mag_arrays[i]`` a list or array of magnitudes at which to take cuts in the corresponding ``mag_names[i]`` filter. mag_slices : list of numpy.ndarrays or numpy.ndarray Widths of interval at which to take slices of magnitudes for deriving astrometric properties. Each ``mag_slices[i]`` maps elementwise to each ``mag_array[i]``, and hence they should be the same shape, with the two lists or arrays meeting ``len(mag_arrays) == len(mag_slices)``. sig_slices : list of numpy.ndarrays or numpy.ndarray Interval widths of quoted astrometric uncertainty to use when isolating individual sets of objects for AUF derivation. Length should match ``mag_array``. List must have the same number of elements as ``mag_arrays``, with each list-element agreeing in length as well; if a numpy array, then ``np.all(mag_arrays.shape == sig_slices.shape)`` must be true. n_pool : integer The maximum number of threads to use when calling ``multiprocessing``. npy_or_csv : string, either ``npy`` or ``csv`` Indicator as to whether the small chunks of sky to be loaded for each sightline's evaluation are in binary ``numpy`` format or saved to disk as a comma-separated values file. coord_or_chunk : string, either ``coord`` or ``chunk`` String indicating whether intermediate files should be saved with filenames that are unique by two coordinates (l/b or RA/Dec) or some kind of singular "chunk" number. Output filenames would then need to follow ``'file_{}{}'`` or ``'file_{}'`` formatting respectively. pos_and_err_indices : list of list or numpy.ndarray of integers In order, the indices within catalogue "b" and then "a" respecetively of either the .npy or .csv file of the longitudinal (e.g. RA or l), latitudinal (Dec or b), and *singular*, circular astrometric precision array. Coordinates should be in degrees while precision should be in the same units as ``sig_slice`` and those of the nearest-neighbour distances, likely arcseconds. Note that coordinates are reversed between calogues: for example, ``[[6, 3, 0], [0, 1, 2]]`` is the case where catalogue "a" has its coordinates in the first three columns in RA/Dec/Err order, while catalogue "b" has its coordinates in a more random order. If ``use_photometric_uncertainties`` is ``True`` then the third index of the first triplet (i.e., the index for the to-be-fit catalogue's uncertainties) should be that of a photometric uncertainty rather than an astrometric uncertainty. mag_indices : list or numpy.ndarray In appropriate order, as expected by e.g. `~macauff.CrossMatch` inputs and `~macauff.make_perturb_aufs`, list the indexes of each magnitude column within either the ``.npy`` or ``.csv`` file loaded for each sub-catalogue sightline. These should be zero-indexed. snr_indices : list or numpy.ndarray For each ``mag_indices`` entry, the corresponding signal-to-noise ratio index in the catalogue. mag_names : list or numpy.ndarray of strings Names of each ``mag_indices`` magnitude. correct_astro_mag_indices_index : integer Index into ``mag_indices`` of the preferred magnitude to use to construct astrometric scaling relations from. Should generally be the one with the most coverage across all detections in a catalogue, or the one with the most precise fluxes. coord_system : string, "equatorial" or "galactic" Identifier of which coordinate system the data are in. Both datasets must be in the same system, which can either be RA/Dec (equatorial) or l/b (galactic) coordinates. saturation_magnitudes : list or numpy.ndarray List of magnitudes brighter than which the given survey suffers saturation, each element matching the filter of ``mag_indices``. pregenerate_cutouts : boolean or None Indicates whether sightline catalogues must have been pre-made, or whether they can be generated by ``AstrometricCorrections`` using specified lines-of-sight and cutout areas and heights. If ``None`` then in-memory catalogues must be passed in the call signature of ``AstrometricCorrections``. n_r : integer Number of elements to generate for real-space evaluation of Bessel functions, used in the convolution of AUF PDfs. n_rho : integer Number of elements to generate for fourier-space Bessel function evaluation, used in the convolution of AUF PDfs. max_rho : float Largest fourier-space value to integrate functions to during the convolution of AUF PDFs. mn_fit_type : string Determines whether we perform quadratic or linear scaling for hyper-parameter fits to data-driven vs quoted astrometric uncertainties. Must either be "quadratic" or "linear." trifilepath : string, optional Filepath of the location into which to save TRILEGAL simulations. If provided ``tri_dens_cube`` and ``tri_dens_array`` must be ``None``, and ``maglim_fs``, ``magnums``, ``tri_num_faints``, ``trifilterset``, and ``trifiltnames`` must be given. Must contain two format ``{}`` options in string, for unique ax1-ax2 sightline combination downloads. maglim_f : float, optional Magnitude in the ``magnum`` filter down to which sources should be drawn for the "faint" sample. Should be ``None`` if ``tri_dens_cube`` and ``tri_dens_array`` are provided. magnum : float, optional Zero-indexed column number of the chosen filter's limiting magnitude. Should be ``None`` if ``tri_dens_cube`` and ``tri_dens_array`` are provided. tri_num_faint : integer, optional Approximate number of objects to simulate in the chosen filter for TRILEGAL simulations. Should be ``None`` if ``tri_dens_cube`` and ``tri_dens_array`` are provided. trifilterset : string, optional Name of the TRILEGAL filter set for which to generate simulations. Should be ``None`` if ``tri_dens_cube`` and ``tri_dens_array`` are provided. trifiltnames : list of string, optional Name of the specific filters to generate perturbation AUF component in. Should be ``None`` if ``tri_dens_cube`` and ``tri_dens_array`` are provided. tri_dens_cube : numpy.ndarray or None, optional If given, array of differential source densities, per square degree per magnitude, along with magnitude bins and intervals, for a set of given filters and sky positions, as computed by `~macauff.make_tri_counts`. Must be provided if ``trifilepath`` is ``None``, else must itself be ``None``. tri_dens_array : numpy.ndarray or None, optional Corresponding sky-coordinate array to extract the relevant column from ``tri_dens_cube``. Must be given if ``tri_dens_cub`` is provided, else ``None``. use_photometric_uncertainties : boolean, optional Flag for whether or not to use the photometric uncertainties instead of astrometric uncertainties when deriving astrometric uncertainties as a function of input precision. Defaults to False (use astrometric uncertainties). cutout_area : float, optional The size, in square degrees, of the regions used to simulate AUFs and determine astrometric corrections. Required if ``pregenerate_cutouts`` is ``False``. cutout_height : float, optional The latitudinal height of the rectangular regions used in calculating astrometric corrections. Required if ``pregenerate_cutouts`` is ``False``. single_sided_auf : boolean, optional Flag indicating whether the AUF of catalogue "a" can be ignored when considering match statistics, or if astrometric corrections are being constructed from matches to a catalogue that also suffers significant non-noise-based astrometric uncertainty. chunks = list or numpy.ndarray of strings, optional List of IDs for each unique set of data if ``coord_or_chunk`` is ``chunk``. In this case, ``ax_dimension`` must be ``2`` and each ``chunk`` must correspond to its ``ax1_mids``-``ax2_mids`` coordinate. return_nm : boolean, optional Flag for whether the output correction arrays ``m`` and ``n`` should be saved to disk (``False``) or returned by the function (``True``). apply_proper_motion_flag : boolean, optional Flag to indicate if either dataset has proper motions that must be applied to its positions before determining astrometric corrections. Defaults to False, in which case neither dataset will check for, or load, proper motion-based data. pm_indices : list of list or numpy.ndarray of integers, optional If given, must contain a list of two elements, each of which contains the two orthogonal sky-axis proper motions' indices for the given dataset, in the same reference frame as the positions to be loaded along with positions, SNRs, photometry, as necessary. As this is on a per-catalogue basis, if either (or both) dataset has no motion information, ``None`` must be given, with the first element of the list being, to be consistent with ``pos_and_err_indices``, the "b", to-be-fit, catalogue, with the second element the "a", "truth" dataset. We could have e.g. ``[None, [4, 5]]`` for the case where we apply motion information to our truth catalogue, with proper motion information being stored in the 5th and 6th columns. pm_ref_epoch_or_index : list of strings or integers, optional If provided, necessary when ``pm_indices`` is passed, each element, consistent with ``pm_indices`` catalogue ordering, must either be a single string, valid for loading into astropy's Time function, representing a common date of observation for all sources in the file or a single integer, loading from the dataset the astropy Time-valid strings for each object individually as a column of the input file. Again, if no motions are to be applied for an individual catalogue but ``apply_proper_motion_flag`` is ``True``, then ``None`` must be given for that list element; for the above example we could either supply ``[None, 6]`` or ``[None, 'J2000']`` to load per-source epochs from our file or supply a fixed observation date respectively. pm_move_to_epoch : string, optional If ``pm_indices`` is provided this must be given, containing a single, astropy Time valid, string, the final epoch to apply proper motions of all objects to. This must be a single string, rather than a per-catalogue list, to ensure that a common epoch is used when proper motions are to be applied to both datasets. """ if single_sided_auf is not True: raise ValueError("single_sided_auf must be True.") if ax_dimension not in (1, 2): raise ValueError("ax_dimension must either be '1' or '2'.") if npy_or_csv not in ("npy", "csv"): raise ValueError("npy_or_csv must either be 'npy' or 'csv'.") if coord_or_chunk not in ("coord", "chunk"): raise ValueError("coord_or_chunk must either be 'coord' or 'chunk'.") if coord_or_chunk == "chunk" and chunks is None: raise ValueError("chunks must be provided if coord_or_chunk is 'chunk'.") if coord_or_chunk == "chunk" and ax_dimension == 1: raise ValueError("ax_dimension must be 2, and ax1-ax2 pairings provided for each chunk " "in chunks if coord_or_chunk is 'chunk'.") if coord_or_chunk == "chunk" and (len(ax1_mids) != len(chunks) or len(ax2_mids) != len(chunks)): raise ValueError("ax1_mids, ax2_mids, and chunks must all be the same length if " "coord_or_chunk is 'chunk'.") if coord_system not in ("equatorial", "galactic"): raise ValueError("coord_system must either be 'equatorial' or 'galactic'.") if (pregenerate_cutouts is not None and pregenerate_cutouts is not True and pregenerate_cutouts is not False): raise ValueError("pregenerate_cutouts should either be 'None', 'True' or 'False'.") if pregenerate_cutouts is False and cutout_area is None: raise ValueError("cutout_area must be given if pregenerate_cutouts is 'False'.") if pregenerate_cutouts is False and cutout_height is None: raise ValueError("cutout_height must be given if pregenerate_cutouts is 'False'.") if use_photometric_uncertainties is not True and use_photometric_uncertainties is not False: raise ValueError("use_photometric_uncertainties must either be True or False.") if return_nm is not True and return_nm is not False: raise ValueError("return_nm must either be True or False.") if mn_fit_type not in ("quadratic", "linear"): raise ValueError("mn_fit_type must either be 'quadratic' or 'linear'.") if apply_proper_motion_flag and np.any([q is None for q in [pm_indices, pm_ref_epoch_or_index, pm_move_to_epoch]]): raise ValueError("apply_proper_motion_flag cannot be True without supplying pm_indices, " "pm_ref_epoch_or_index, and pm_move_to_epoch.") self.return_nm = return_nm self.psf_fwhms = psf_fwhms self.numtrials = numtrials self.nn_radius = nn_radius self.dens_search_radius = dens_search_radius # Currently hard-coded as it isn't useful for this work, but is required # for calling the perturbation AUF algorithms. self.dmcut = [2.5] self.n_r, self.max_rho, self.n_rho = n_r, max_rho, n_rho self.save_folder = save_folder self.trifilepath = trifilepath self.maglim_f = maglim_f self.magnum = magnum self.tri_num_faint = tri_num_faint self.trifilterset = trifilterset self.trifiltnames = trifiltnames self.gal_wavs_micron = gal_wavs_micron self.gal_ab_offsets = gal_ab_offsets self.gal_filtnames = gal_filtnames self.gal_alavs = gal_alavs self.tri_dens_cube = tri_dens_cube self.tri_dens_array = tri_dens_array self.dm = dm self.dd_params = dd_params self.l_cut = l_cut self.ax1_mids = ax1_mids self.ax2_mids = ax2_mids self.ax_dimension = ax_dimension self.pregenerate_cutouts = pregenerate_cutouts if self.pregenerate_cutouts is not None: if not self.pregenerate_cutouts: self.cutout_area = cutout_area self.cutout_height = cutout_height self.mag_arrays = [np.array(x) for x in mag_arrays] self.mag_slices = [np.array(x) for x in mag_slices] self.sig_slices = [np.array(x) for x in sig_slices] self.saturation_magnitudes = np.array(saturation_magnitudes) self.npy_or_csv = npy_or_csv self.coord_or_chunk = coord_or_chunk self.chunks = chunks self.coord_system = coord_system self.mn_fit_type = mn_fit_type self.apply_proper_motion_flag = apply_proper_motion_flag self.pm_move_to_epoch = pm_move_to_epoch if npy_or_csv == 'npy': self.pos_and_err_indices_full = pos_and_err_indices self.mag_indices = mag_indices self.snr_indices = snr_indices self.mag_names = mag_names self.correct_astro_mag_indices_index = correct_astro_mag_indices_index if apply_proper_motion_flag: self.pm_indices = pm_indices self.pm_ref_epoch_or_index = pm_ref_epoch_or_index else: # np.genfromtxt will load in pos_and_err or pos_and_err, mag_ind, # snr_ind order for the two catalogues. Each will effectively # change its ordering, since we then load [0] for pos_and_err[0][0], # etc. for all options. These need saving for np.genfromtxt but # also for obtaining the correct column in the resulting sub-set of # the loaded csv file. self.a_cols = np.array(pos_and_err_indices[1]) self.b_cols = np.concatenate((pos_and_err_indices[0], mag_indices, snr_indices)) if apply_proper_motion_flag: if pm_indices[0] is not None: self.b_cols = np.concatenate((self.b_cols, pm_indices[0])) if isinstance(pm_ref_epoch_or_index[0], int): self.b_cols = np.concatenate((self.b_cols, pm_ref_epoch_or_index[0])) if pm_indices[1] is not None: self.a_cols = np.concatenate((self.a_cols, pm_indices[1])) if isinstance(pm_ref_epoch_or_index[1], int): self.a_cols = np.concatenate((self.a_cols, pm_ref_epoch_or_index[1])) self.pos_and_err_indices_full = [ [np.argmin(np.abs(q - self.b_cols)) for q in pos_and_err_indices[0]], [np.argmin(np.abs(q - self.a_cols)) for q in pos_and_err_indices[1]]] self.mag_indices = [np.argmin(np.abs(q - self.b_cols)) for q in mag_indices] self.snr_indices = [np.argmin(np.abs(q - self.b_cols)) for q in snr_indices] self.mag_names = mag_names self.correct_astro_mag_indices_index = correct_astro_mag_indices_index if apply_proper_motion_flag: self.pm_indices = [None, None] self.pm_ref_epoch_or_index = [None, None] if pm_indices[0] is not None: self.pm_indices[0] = [np.argmin(np.abs(q - self.b_cols)) for q in pm_indices[0]] if isinstance(pm_ref_epoch_or_index[0], int): self.pm_ref_epoch_or_index[0] = np.argmin(np.abs(pm_ref_epoch_or_index[0] - self.b_cols)) else: self.pm_ref_epoch_or_index[0] = pm_ref_epoch_or_index[0] if pm_indices[1] is not None: self.pm_indices[1] = [np.argmin(np.abs(q - self.a_cols)) for q in pm_indices[1]] if isinstance(pm_ref_epoch_or_index[1], int): self.pm_ref_epoch_or_index[1] = np.argmin(np.abs(pm_ref_epoch_or_index[1] - self.a_cols)) else: self.pm_ref_epoch_or_index[1] = pm_ref_epoch_or_index[1] self.n_pool = n_pool self.use_photometric_uncertainties = use_photometric_uncertainties self.n_filt_cols = np.ceil(np.sqrt(len(self.mag_indices))).astype(int) self.n_filt_rows = np.ceil(len(self.mag_indices) / self.n_filt_cols).astype(int) for folder in [self.save_folder, f'{self.save_folder}/npy', f'{self.save_folder}/pdf']: if not (return_nm and 'npy' in folder): if not os.path.exists(folder): os.makedirs(folder) # pylint: disable-next=too-many-statements,too-many-branches
[docs] def __call__(self, a_cat=None, b_cat=None, a_cat_name=None, b_cat_name=None, a_cat_func=None, b_cat_func=None, tri_download=True, overwrite_all_sightlines=False, make_plots=False, make_summary_plot=True, seeing_ranges=None, single_or_repeat="single", repeat_unique_visits_list=None): """ Call function for the correction calculation process. Parameters ---------- a_cat : numpy.ndarray or list of numpy.ndarray, optional A pre-loaded, or list of pre-loaded, reference catalogue for the aiding in the determination the astrometric uncertainties for ``b_cat``. Should be provided if ``pregenerate_cutouts`` is ``None``, and must be itself ``None`` if ``a_cat_name`` is given. b_cat : numpy.ndarray or list of numpy.ndarray, optional A pre-loaded, or list of pre-loaded, catalogue for which to determination the astrometric uncertainties. Should be provided if ``pregenerate_cutouts`` is ``None``, and must be itself ``None`` if ``a_cat_name`` is given. a_cat_name : string, optional Name of the catalogue "a" filename, pre-generated or saved by ``a_cat_func``. Must accept one or two formats via Python string formatting (e.g. ``'a_string_{}'``) that represent ``chunk``, or ``ax1_mid`` and ``ax2_mid``, depending on ``coord_or_chunk``. Must be given if ``a_cat`` is ``None``. b_cat_name : string, optional Name of the catalogue "b" filename. Must accept the same string formatting as ``a_cat_name``. Must be given if ``b_cat`` is ``None``. a_cat_func : callable, optional Function used to generate reduced catalogue table for catalogue "a". Must be given if ``pregenerate_cutouts`` is ``False``. b_cat_func : callable Function used to generate reduced catalogue table for catalogue "b". Must be given if ``pregenerate_cutouts`` is ``False``. tri_download : boolean or None, optional Flag determining if TRILEGAL simulations should be re-downloaded if they already exist on disk. Should be ``None`` if ``tri_dens_cube`` was given in initialisation. overwrite_all_sightlines : boolean, optional Flag for whether to create a totally fresh run of astrometric corrections, regardless of whether``mn_sigs_array`` is saved on disk. Defaults to ``False``. make_plots : boolean, optional Determines if intermediate figures are generated in the process of deriving astrometric corrections. make_summary_plot : boolean, optional If ``True`` then final summary plot is created, even if ``make_plots`` is ``False`` and intermediate plots are not created. seeing_ranges : list or numpy.ndarray, optional If ``make_plots`` is True, must be a valid 1-, 2-, or 3-length list of floats, the seeing, in arcseconds, of observations, for plotting over astrometric precision vs SNR distributions. single_or_repeat : string, either "single" or "repeat" Flag indicating whether catalogue is made of a single set of observations or repeated "visits" to this patch of sky. If "single" then each row is assumed to be unique, but if "repeat" is given then ``repeat_unique_visits_list`` must be provided, giving the indices for which row comes from which unique repeat-observation of the sky region. repeat_unique_visits_list : list of list or numpy.ndarray of integers If ``single_or_repeat`` is "repeat", then these arrays must be provided, giving the ID of each sub-visit that has been merged into a single catalogue, for use in separating individual visits out for cross-matches and local normalising density calculations. Length must match ``a_cat`` and ``b_cat``, one set of visit indices per field pointing. """ if (a_cat is None and b_cat is not None) or (a_cat is not None and b_cat is None): raise ValueError("a_cat and b_cat must either both be None or both not be None.") if (a_cat_name is None and b_cat_name is not None) or (a_cat_name is not None and b_cat_name is None): raise ValueError("a_cat_name and b_cat_name must either both be None or both not be None.") if self.pregenerate_cutouts is not None and a_cat is not None: raise ValueError("pregenerate_cutouts must be None if a_cat is not None.") if a_cat_func is not None and a_cat is not None: raise ValueError("a_cat_func must be None if a_cat is not None.") if b_cat_func is not None and b_cat is not None: raise ValueError("b_cat_func must be None if b_cat is not None.") if self.pregenerate_cutouts is None and a_cat is None: raise ValueError("a_cat must not be None if pregenerate_cutouts is None.") if (a_cat is not None and a_cat_name is not None) or (a_cat is None and a_cat_name is None): raise ValueError("a_cat and a_cat_name must not both be None or both not be None.") if a_cat_func is None and self.pregenerate_cutouts is False: raise ValueError("a_cat_func must be given if pregenerate_cutouts is 'False'.") if b_cat_func is None and self.pregenerate_cutouts is False: raise ValueError("b_cat_func must be given if pregenerate_cutouts is 'False'.") if tri_download not in (None, True, False): raise ValueError("tri_download must either be True, False, or None.") if self.trifilepath is not None and tri_download not in (True, False): raise ValueError("tri_download must either be True or False if trifilepath given.") if tri_download is not None and self.tri_dens_cube is not None: raise ValueError("tri_download must be None if tri_dens_cube is given.") if make_plots and seeing_ranges is None: raise ValueError("seeing_ranges must be provided if make_plots is True.") if make_plots: seeing_ranges = np.array(seeing_ranges) if len(seeing_ranges) not in [1, 2, 3] or len(seeing_ranges.shape) != 1: raise ValueError("seeing_ranges must be a list of length 1, 2, or 3.") try: seeing_ranges = np.array([float(f) for f in seeing_ranges]) except ValueError as exc: raise ValueError('seeing_ranges should be a list of floats.') from exc self.seeing_ranges = seeing_ranges else: self.seeing_ranges = None if single_or_repeat not in ["single", "repeat"]: raise ValueError("single_or_repeat must either be 'single' or 'repeat'.") # For now, limit repeat observation handling to datasets passed through # directly, to avoid extra work loading those. if (not (self.pregenerate_cutouts is None and a_cat is not None)) and single_or_repeat == "repeat": raise ValueError("single_or_repeat cannot be 'repeat' unless pregenerate_cutouts is None and " "a_cat and b_cat are provided directly.") self.single_or_repeat = single_or_repeat if self.single_or_repeat == "repeat" and repeat_unique_visits_list is None: raise ValueError("repeat_unique_visits_list must be provided if single_or_repeat is 'repeat'.") if self.single_or_repeat == "repeat": self.repeat_unique_visits_list = repeat_unique_visits_list if isinstance(self.repeat_unique_visits_list, np.ndarray): self.repeat_unique_visits_list = [self.repeat_unique_visits_list] self.a_cat_func = a_cat_func self.b_cat_func = b_cat_func self.a_cat_name = a_cat_name self.b_cat_name = b_cat_name if isinstance(a_cat, np.ndarray): self.a_cat = [a_cat] else: self.a_cat = a_cat if isinstance(b_cat, np.ndarray): self.b_cat = [b_cat] else: self.b_cat = b_cat self.tri_download = tri_download self.make_plots = make_plots self.make_summary_plot = make_summary_plot self.make_ax_coords() if self.pregenerate_cutouts is False: self.make_catalogue_cutouts() # Making coords/cutouts happens for all sightlines, and then we # loop through each individually: if self.coord_or_chunk == 'coord': zip_list = (self.ax1_mids, self.ax2_mids) else: zip_list = (self.ax1_mids, self.ax2_mids, self.chunks) if self.use_photometric_uncertainties: shape = (len(self.ax1_mids), len(self.mag_names), 4) else: shape = (len(self.ax1_mids), 4) if (self.return_nm or overwrite_all_sightlines or not os.path.isfile(f'{self.save_folder}/npy/mn_sigs_array.npy')): if not self.return_nm: mn_sigs = open_memmap(f'{self.save_folder}/npy/mn_sigs_array.npy', mode='w+', dtype=float, shape=shape) else: mn_sigs = np.empty(dtype=float, shape=shape) mn_sigs[:] = -9999 else: mn_sigs = open_memmap(f'{self.save_folder}/npy/mn_sigs_array.npy', mode='r+') if self.make_summary_plot: if self.use_photometric_uncertainties: self.gs_single_sigsig = self.make_gridspec('single_sigsig', len(self.mag_names), 2, 0.8, 5) self.ax_b_sing = [plt.subplot(self.gs_single_sigsig[q, 0]) for q in range(len(self.mag_names))] sys.stdout.flush() self.ax_b_log_sing = [plt.subplot(self.gs_single_sigsig[q, 1]) for q in range(len(self.mag_names))] else: self.gs_single_sigsig = self.make_gridspec('single_sigsig', 1, 2, 0.8, 5) self.ax_b_sing = [plt.subplot(self.gs_single_sigsig[0])] self.ax_b_log_sing = [plt.subplot(self.gs_single_sigsig[1])] self.ylims_sing = [999, 0] self.cols = ['k', 'r', 'b', 'g', 'c', 'm', 'orange', 'brown', 'purple', 'grey', 'olive', 'cornflowerblue', 'deeppink', 'maroon', 'palevioletred', 'teal', 'crimson', 'chocolate', 'darksalmon', 'steelblue', 'slateblue', 'tan', 'yellowgreen', 'silver'] if self.use_photometric_uncertainties: shape = (len(self.ax1_mids), len(self.mag_names)) else: shape = len(self.ax1_mids) self.avg_b_dens = np.empty(shape, float) self.mn_poisson_cdfs = np.empty(shape, object) self.ind_poisson_cdfs = np.empty(shape, object) self.input_sigs = [] self.derived_sigs = [] for index_, list_of_things in enumerate(zip(*zip_list)): if np.all(mn_sigs[index_, :] != -9999): continue if self.coord_or_chunk == 'coord': ax1_mid, ax2_mid = list_of_things cat_args = (ax1_mid, ax2_mid) file_name = f'{ax1_mid}_{ax2_mid}' else: ax1_mid, ax2_mid, chunk = list_of_things cat_args = (chunk,) file_name = f'{chunk}' self.list_of_things = list_of_things self.cat_args = cat_args self.file_name = file_name t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f'{t}, {self.file_name}: Running astrometry fits for sightline ' f'{index_+1}/{len(self.ax1_mids)}...') sys.stdout.flush() if self.pregenerate_cutouts is None: self.a = self.a_cat[index_] self.b = self.b_cat[index_] if self.single_or_repeat == 'repeat': self.repeat_unique_visits = self.repeat_unique_visits_list[index_] else: self.a = self.load_catalogue('a', self.cat_args) if self.apply_proper_motion_flag and self.pm_indices[1] is not None: pm_r_e = self.pm_ref_epoch_or_index[1] if not isinstance( self.pm_ref_epoch_or_index[1], int) else self.a[:, self.pm_ref_epoch_or_index[1]] x, y = apply_proper_motion( self.a[:, self.pos_and_err_indices_full[1][0]], self.a[:, self.pos_and_err_indices_full[1][1]], self.a[:, self.pm_indices[1][0]], self.a[:, self.pm_indices[1][1]], pm_r_e, self.pm_move_to_epoch, self.coord_system) self.a[:, self.pos_and_err_indices_full[1][0]] = x self.a[:, self.pos_and_err_indices_full[1][1]] = y self.b = self.load_catalogue('b', self.cat_args) if self.apply_proper_motion_flag and self.pm_indices[0] is not None: pm_r_e = self.pm_ref_epoch_or_index[0] if not isinstance( self.pm_ref_epoch_or_index[0], int) else self.b[:, self.pm_ref_epoch_or_index[0]] x, y = apply_proper_motion( self.b[:, self.pos_and_err_indices_full[0][0]], self.b[:, self.pos_and_err_indices_full[0][1]], self.b[:, self.pm_indices[0][0]], self.b[:, self.pm_indices[0][1]], pm_r_e, self.pm_move_to_epoch, self.coord_system) self.b[:, self.pos_and_err_indices_full[0][0]] = x self.b[:, self.pos_and_err_indices_full[0][1]] = y self.area, self.hull_points, self.hull_x_shift = convex_hull_area( self.b[:, self.pos_and_err_indices_full[0][0]], self.b[:, self.pos_and_err_indices_full[0][1]], return_hull=True) # At this point we need to loop to accommodate the possibility # that we're generating a per-band parameterisation. If # use_photometric_uncertainties is False this will be a single loop, # boringly doing nothing. for unc_index in range(len(self.pos_and_err_indices_full[0])-2): # If we aren't using photometric uncertainties then we only do # do a single loop through unc_index, but need to use the # correct_astro_mag_indices_index element of all of our # magnitude-related terms; however, if we are using photometry, # then unc_index loops as intended. p = unc_index if self.use_photometric_uncertainties else self.correct_astro_mag_indices_index if self.use_photometric_uncertainties: # Skip any magnitudes that are never detected! if np.sum(~np.isnan(self.b[:, self.mag_indices[p]])) == 0: continue self.psf_fwhm = self.psf_fwhms[p] self.gal_wav_micron = self.gal_wavs_micron[p] self.gal_ab_offset = self.gal_ab_offsets[p] self.gal_filtname = self.gal_filtnames[p] self.gal_alav = self.gal_alavs[p] self.mag_array = self.mag_arrays[p] self.mag_slice = self.mag_slices[p] self.sig_slice = self.sig_slices[p] if self.trifilepath is not None: # For record keeping, trifilepath and trifilterset, magnum, # maglim_f, and tri_num_faint aren't per-band, so don't get # selected out of a list. self.trifiltname = self.trifiltnames[p] else: # tri_dens_cube is of shape (S, F, x, 3), and we need to # determine which index into S and F we want. Assume that # we put our filter indices in the "right" order, and then # pull nearest-neighbour sky position. sky_index = mff.find_nearest_point([ax1_mid], [ax2_mid], self.tri_dens_array[:, 0], self.tri_dens_array[:, 1])[0] self.tri_hist = self.tri_dens_cube[sky_index, p, :, 0] self.tri_hist = self.tri_hist[~np.isnan(self.tri_hist)] self.tri_mags = self.tri_dens_cube[sky_index, p, :, 1] self.tri_mags = self.tri_mags[~np.isnan(self.tri_mags)] self.dtri_mags = self.tri_dens_cube[sky_index, p, :, 2] self.dtri_mags = self.dtri_mags[~np.isnan(self.dtri_mags)] self.psf_radius = 1.185 * self.psf_fwhm self.psfsig = self.psf_fwhm / (2 * np.sqrt(2 * np.log(2))) self.r = np.linspace(0, self.psf_radius, self.n_r) self.rho = np.linspace(0, self.max_rho, self.n_rho) self.dr, self.drho = np.diff(self.r), np.diff(self.rho) self.j0s = mff.calc_j0(self.rho[:-1]+self.drho/2, self.r[:-1]+self.dr/2) self.n_mag_cols = np.ceil(np.sqrt(len(self.mag_array))).astype(int) self.n_mag_rows = np.ceil(len(self.mag_array) / self.n_mag_cols).astype(int) self.unc_index = unc_index self.pos_and_err_indices = [ [self.pos_and_err_indices_full[0][0], self.pos_and_err_indices_full[0][1], self.pos_and_err_indices_full[0][unc_index+2]], self.pos_and_err_indices_full[1]] if self.use_photometric_uncertainties: self.magnitude_append = f'{self.mag_names[unc_index]}_' else: self.magnitude_append = '' if self.use_photometric_uncertainties: n_sources = np.sum(~np.isnan(self.b[:, self.pos_and_err_indices[0][2]])) else: n_sources = len(self.b) if self.single_or_repeat == 'repeat': # Divide the density through by the number of repeat visits, # since we want the average density of the visits, not the # sum of all repeated visits. n_visits = len(np.unique(self.repeat_unique_visits)) if self.use_photometric_uncertainties: self.avg_b_dens[index_, unc_index] = n_sources / self.area / n_visits else: self.avg_b_dens[index_] = n_sources / self.area / n_visits else: if self.use_photometric_uncertainties: self.avg_b_dens[index_, unc_index] = n_sources / self.area else: self.avg_b_dens[index_] = n_sources / self.area self.make_star_galaxy_counts() if self.make_plots: self.plot_star_galaxy_counts() self.calculate_local_densities_and_nearest_neighbours() self.simulate_aufs() self.create_auf_pdfs() # If we don't have at least 5 unique magnitude slices to calculate, # reduce the number of data points per histogram to increase # number statistics. if np.sum([q[0] == -1 for q in self.pdfs]) > len(self.pdfs)-5: warnings.warn("Reduced PDF histogram counts to 75.") self.create_auf_pdfs(min_hist_cut=75) if np.sum([q[0] == -1 for q in self.pdfs]) > len(self.pdfs)-5: warnings.warn("Reduced PDF histogram counts to 50.") self.create_auf_pdfs(min_hist_cut=50) m_sig, n_sig = self.fit_uncertainty() if not np.sum([q[0] == -1 for q in self.pdfs]) <= len(self.pdfs)-5: # Fall back to not correcting anything if data still too poor # to draw any meaningful conclusions from. if not self.use_photometric_uncertainties: m_sig, n_sig = 1, 0 else: # For photometric uncertainties, we know we at minimum # have a PSF FWHM correction factor to include. m_sig, n_sig = self.psf_fwhm, 0 # Keep fit_sigs[:, 1] as the individual fits we # were able to make. self.fit_sigs[:, 0] = self.avg_sig[:, 0] if self.use_photometric_uncertainties: mn_sigs[index_, unc_index, 0] = m_sig mn_sigs[index_, unc_index, 1] = n_sig mn_sigs[index_, unc_index, 2] = ax1_mid mn_sigs[index_, unc_index, 3] = ax2_mid else: mn_sigs[index_, 0] = m_sig mn_sigs[index_, 1] = n_sig mn_sigs[index_, 2] = ax1_mid mn_sigs[index_, 3] = ax2_mid mn_poisson_cdfs, ind_poisson_cdfs = self.plot_fits_calculate_cdfs() if self.make_summary_plot: if self.use_photometric_uncertainties: self.mn_poisson_cdfs[index_, unc_index] = mn_poisson_cdfs self.ind_poisson_cdfs[index_, unc_index] = ind_poisson_cdfs else: self.mn_poisson_cdfs[index_] = mn_poisson_cdfs self.ind_poisson_cdfs[index_] = ind_poisson_cdfs if np.sum(~self.skip_flags) > 0: c = self.cols[index_ % len(self.cols)] plt.figure('single_sigsig') self.ax_b_sing[unc_index].errorbar(self.avg_sig[~self.skip_flags, 0], self.fit_sigs[~self.skip_flags, 1], linestyle='None', c=c, marker='x', label=file_name) self.ax_b_log_sing[unc_index].errorbar(self.avg_sig[~self.skip_flags, 0], self.fit_sigs[~self.skip_flags, 1], linestyle='None', c=c, marker='x') self.ylims_sing[0] = min(self.ylims_sing[0], np.amin(self.fit_sigs[~self.skip_flags, 1])) self.ylims_sing[1] = max(self.ylims_sing[1], np.amax(self.fit_sigs[~self.skip_flags, 1])) self.input_sigs.append(self.avg_sig[~self.skip_flags, 0]) self.derived_sigs.append(self.fit_sigs[~self.skip_flags, 1]) self.plot_snr_mag_sig() self.mn_sigs = mn_sigs if self.make_summary_plot: self.finalise_summary_plots() if self.return_nm: return mn_sigs return None
[docs] def make_ax_coords(self, check_b_only=False): """ Derive the unique ax1-ax2 combinations used in fitting astrometry, and calculate corner coordinates based on the size of the box and its central coordinates. Parameters ========== check_b_only : boolean, optional Overloadable flag for cases where we can ignore catalogue 'a' and only need to check for catalogue 'b' existing if pregenerate_cutouts is True. """ # If ax1 and ax2 are given as one-dimensional arrays, we need to # propagate those into two-dimensional grids first. Otherwise we # can skip this step. if self.ax_dimension == 1: self.ax1_mids_ = np.copy(self.ax1_mids) self.ax2_mids_ = np.copy(self.ax2_mids) self.ax1_mids, self.ax2_mids = np.meshgrid(self.ax1_mids_, self.ax2_mids_) self.ax1_mids, self.ax2_mids = self.ax1_mids.flatten(), self.ax2_mids.flatten() self.ax1_grid_length = len(self.ax1_mids_) self.ax2_grid_length = len(self.ax2_mids_) else: self.ax1_grid_length = np.ceil(np.sqrt(len(self.ax1_mids))).astype(int) self.ax2_grid_length = np.ceil(len(self.ax1_mids) / self.ax1_grid_length).astype(int) if self.pregenerate_cutouts is False: # If we don't force pre-generated sightlines then we can generate # corners based on requested size and height, and latitude. Force # constant box height, but allow longitude to float to make sure # that we get good area coverage as the cos-delta factor increases # towards the poles. self.ax1_mins, self.ax1_maxs = np.empty_like(self.ax1_mids), np.empty_like(self.ax1_mids) self.ax2_mins, self.ax2_maxs = np.empty_like(self.ax1_mids), np.empty_like(self.ax1_mids) for i, (ax1_mid, ax2_mid) in enumerate(zip(self.ax1_mids, self.ax2_mids)): self.ax2_mins[i] = ax2_mid-self.cutout_height/2 self.ax2_maxs[i] = ax2_mid+self.cutout_height/2 lat_integral = (np.sin(np.radians(self.ax2_maxs[i])) - np.sin(np.radians(self.ax2_mins[i]))) * 180/np.pi if 360 * lat_integral < self.cutout_area: # If sufficiently high in latitude, we ought to be able to take # a full latitudinal slice around the entire sphere. delta_lon = 180 else: delta_lon = np.around(0.5 * self.cutout_area / lat_integral, decimals=1) # Handle wrap-around longitude maths naively by forcing 0/360 as the # minimum/maximum allowed limits of each box. # pylint: disable-next=fixme # TODO: once the rest of AstrometricCorrections handles the wraparound # logic, relax this requirement. if ax1_mid - delta_lon < 0: self.ax1_mins[i] = 0 self.ax1_maxs[i] = 2 * delta_lon elif ax1_mid + delta_lon > 360: self.ax1_maxs[i] = 360 self.ax1_mins[i] = 360 - 2 * delta_lon else: self.ax1_mins[i] = ax1_mid - delta_lon self.ax1_maxs[i] = ax1_mid + delta_lon else: # If we've definitely already made all of the cutouts, the corners # are preset. if self.coord_or_chunk == 'coord': zip_list = (self.ax1_mids, self.ax2_mids) else: zip_list = (self.chunks,) for i, list_of_things in enumerate(zip(*zip_list)): if self.coord_or_chunk == 'coord': ax1_mid, ax2_mid = list_of_things cat_args = (ax1_mid, ax2_mid) else: chunk, = list_of_things cat_args = (chunk,) if self.pregenerate_cutouts is not None: if not check_b_only: if not os.path.isfile(self.a_cat_name.format(*cat_args)): raise ValueError("If pregenerate_cutouts is 'True' all files must " f"exist already, but {self.a_cat_name.format(*cat_args)} " "does not.") if not os.path.isfile(self.b_cat_name.format(*cat_args)): raise ValueError("If pregenerate_cutouts is 'True' all files must " f"exist already, but {self.b_cat_name.format(*cat_args)} does not.")
[docs] def make_catalogue_cutouts(self): """ Generate cutout catalogues for regions as defined by corner ax1-ax2 coordinates. """ if self.coord_or_chunk == 'coord': zip_list = (self.ax1_mids, self.ax2_mids, self.ax1_mins, self.ax1_maxs, self.ax2_mins, self.ax2_maxs) else: zip_list = (self.chunks, self.ax1_mins, self.ax1_maxs, self.ax2_mins, self.ax2_maxs) for index_, list_of_things in enumerate(zip(*zip_list)): if self.coord_or_chunk == 'coord': ax1_mid, ax2_mid, ax1_min, ax1_max, ax2_min, ax2_max = list_of_things file_name = f'{ax1_mid}_{ax2_mid}' else: chunk, ax1_min, ax1_max, ax2_min, ax2_max = list_of_things file_name = f'{chunk}' t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f'{t}, {file_name}: Creating catalogue cutouts... {index_+1}/{len(self.ax1_mids)}', end='\r') sys.stdout.flush() if self.coord_or_chunk == 'coord': cat_args = (ax1_mid, ax2_mid) else: cat_args = (chunk,) if not os.path.isfile(self.a_cat_name.format(*cat_args)): self.a_cat_func(ax1_min, ax1_max, ax2_min, ax2_max, *cat_args) if not os.path.isfile(self.b_cat_name.format(*cat_args)): self.b_cat_func(ax1_min, ax1_max, ax2_min, ax2_max, *cat_args) print('') sys.stdout.flush()
[docs] def make_gridspec(self, name, y, x, ratio, z, **kwargs): """ Convenience function to generate a matplotlib canvas and populate it with a gridspec grid. Parameters ---------- name : string Unique identifier for the figure returned. y : int Number of rows in the resulting grid of axes. x : int Number of columns in the figure grid. ratio : float Aspect ratio of axis, with ``0.5`` being half as tall as it is wide. z : float Width of a single axis in real figure units, likely inches. Returns ------- gs : ~`matplotlib.gridspec.GridSpec` The created gridspec instance. """ plt.figure(name, figsize=(z*x, z*ratio*y)) gs = gridspec.GridSpec(y, x, **kwargs) return gs
[docs] def make_star_galaxy_counts(self): """ Generate differential source counts for each cutout region, simulating both stars and galaxies. """ # pylint: disable-next=fixme # TODO: load from CrossMatch in some fashion to avoid errors from # updating one but not the other. self.gal_cmau_array = np.empty((5, 2, 4), float) # See Wilson (2022, RNAAS, 6, 60) for the meanings of the variables c, m, # a, and u. For each of M*/phi*/alpha/P/Q, for blue+red galaxies, 2-4 # variables are derived as a function of wavelength, or Q(P). self.gal_cmau_array[0, :, :] = [[-24.286513, 1.141760, 2.655846, np.nan], [-23.192520, 1.778718, 1.668292, np.nan]] self.gal_cmau_array[1, :, :] = [[0.001487, 2.918841, 0.000510, np.nan], [0.000560, 7.691261, 0.003330, -0.065565]] self.gal_cmau_array[2, :, :] = [[-1.257761, 0.021362, np.nan, np.nan], [-0.309077, -0.067411, np.nan, np.nan]] self.gal_cmau_array[3, :, :] = [[-0.302018, 0.034203, np.nan, np.nan], [-0.713062, 0.233366, np.nan, np.nan]] self.gal_cmau_array[4, :, :] = [[1.233627, -0.322347, np.nan, np.nan], [1.068926, -0.385984, np.nan, np.nan]] self.gal_alpha0 = [[2.079, 3.524, 1.917, 1.992, 2.536], [2.461, 2.358, 2.568, 2.268, 2.402]] self.gal_alpha1 = [[2.265, 3.862, 1.921, 1.685, 2.480], [2.410, 2.340, 2.200, 2.540, 2.464]] self.gal_alphaweight = [[3.47e+09, 3.31e+06, 2.13e+09, 1.64e+10, 1.01e+09], [3.84e+09, 1.57e+06, 3.91e+08, 4.66e+10, 3.03e+07]] t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f'{t}, {self.file_name}: Creating simulated star+galaxy counts...') sys.stdout.flush() if self.coord_or_chunk == 'coord': ax1_mid, ax2_mid = self.list_of_things else: ax1_mid, ax2_mid, _ = self.list_of_things p_ind = self.unc_index if self.use_photometric_uncertainties else self.correct_astro_mag_indices_index mag_ind = self.mag_indices[p_ind] b_mag_data = self.b[~np.isnan(self.b[:, mag_ind]), mag_ind] hist_mag, bins = np.histogram(b_mag_data, bins='auto') minmag = bins[0] # Ensure that we're only counting sources for normalisation purposes # down to the completeness turnover. # pylint: disable-next=fixme # TODO: make the half-mag offset flexible, passing from CrossMatch and/or # directly into AstrometricCorrections. maxmag = bins[:-1][np.argmax(hist_mag)] - 0.5 if self.trifilepath is not None: x, y = os.path.splitext(self.trifilepath.format(ax1_mid, ax2_mid)) full_file = x + '_faint' + y # pylint: disable-next=possibly-used-before-assignment if (self.trifilepath is not None and (self.tri_download or not os.path.isfile(full_file))): data_bright_dens = np.sum(~np.isnan(b_mag_data) & (b_mag_data <= maxmag)) / self.area # pylint: disable-next=fixme # TODO: un-hardcode min_bright_tri_number min_bright_tri_number = 1000 min_area = min_bright_tri_number / data_bright_dens download_trilegal_simulation(full_file, self.trifilterset, ax1_mid, ax2_mid, self.magnum, self.coord_system, self.maglim_f, min_area, av=1, sigma_av=0, total_objs=self.tri_num_faint) avs = generate_avs_inside_hull(self.hull_points, self.hull_x_shift, self.coord_system) if self.trifilepath is not None: # Don't pass the _faint-appended filepath to make_tri_counts, since it # handles that itself. tri_hist, tri_mags, dtri_mags = make_tri_counts( self.trifilepath.format(ax1_mid, ax2_mid), self.trifiltname, self.dm, np.amin(b_mag_data), al_av=self.gal_alav, av_grid=avs) else: tri_hist, tri_mags = self.tri_hist, self.tri_mags dtri_mags = self.dtri_mags gal_dns = create_galaxy_counts( self.gal_cmau_array, tri_mags+dtri_mags/2, np.linspace(0, 4, 41), self.gal_wav_micron, self.gal_alpha0, self.gal_alpha1, self.gal_alphaweight, self.gal_ab_offset, self.gal_filtname, self.gal_alav*avs) d_hc = np.where(hist_mag > 3)[0] data_hist = hist_mag[d_hc] data_dbins = np.diff(bins)[d_hc] data_bins = bins[d_hc] data_uncert = np.sqrt(data_hist) / data_dbins / self.area data_hist = data_hist / data_dbins / self.area if self.single_or_repeat == 'repeat': # Divide the counts through by the number of repeat visits. n_visits = len(np.unique(self.repeat_unique_visits)) data_hist = data_hist / n_visits data_uncert = data_uncert / np.sqrt(n_visits) data_loghist = np.log10(data_hist) data_dloghist = 1/np.log(10) * data_uncert / data_hist q = (data_bins <= maxmag) & (data_bins >= self.saturation_magnitudes[p_ind]) tri_corr, gal_corr = find_model_counts_corrections(data_loghist[q], data_dloghist[q], data_bins[q]+data_dbins[q]/2, tri_hist, gal_dns, tri_mags+dtri_mags/2) log10y = np.log10(tri_hist * tri_corr + gal_dns * gal_corr) mag_slice = (tri_mags >= minmag) & (tri_mags+dtri_mags <= maxmag) n_norm = np.sum(10**log10y[mag_slice] * dtri_mags[mag_slice]) self.log10y = log10y self.tri_hist, self.tri_mags, self.dtri_mags = tri_hist, tri_mags, dtri_mags self.gal_dns = gal_dns self.minmag, self.maxmag, self.n_norm = minmag, maxmag, n_norm self.tri_corr, self.gal_corr = tri_corr, gal_corr
[docs] def plot_star_galaxy_counts(self): """ Plotting routine to display data and model differential source counts, for verification purposes. """ gs = self.make_gridspec('123123', 1, 1, 0.8, 5) t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f'{t}, {self.file_name}: Plotting data and model counts...') sys.stdout.flush() p_ind = self.unc_index if self.use_photometric_uncertainties else self.correct_astro_mag_indices_index mag_ind = self.mag_indices[p_ind] data_mags = self.b[~np.isnan(self.b[:, mag_ind]), mag_ind] ax = plt.subplot(gs[0]) ax.errorbar(self.tri_mags+self.dtri_mags/2, self.log10y, c='k', marker='.', zorder=1, ls='None') data_hist, data_bins = np.histogram(data_mags, bins='auto') d_hc = np.where(data_hist > 3)[0] data_hist = data_hist[d_hc] data_dbins = np.diff(data_bins)[d_hc] data_bins = data_bins[d_hc] data_uncert = np.sqrt(data_hist) / data_dbins / self.area data_hist = data_hist / data_dbins / self.area if self.single_or_repeat == 'repeat': # Divide the counts through by the number of repeat visits. n_visits = len(np.unique(self.repeat_unique_visits)) data_hist = data_hist / n_visits data_uncert = data_uncert / np.sqrt(n_visits) data_loghist = np.log10(data_hist) data_dloghist = 1/np.log(10) * data_uncert / data_hist ax.errorbar(data_bins+data_dbins/2, data_loghist, yerr=data_dloghist, c='r', marker='.', zorder=1, ls='None') lims = ax.get_ylim() q = self.tri_hist > 0 ax.plot((self.tri_mags+self.dtri_mags/2)[q], np.log10(self.tri_hist[q]) + np.log10(self.tri_corr), 'k--') q = self.gal_dns > 0 ax.plot((self.tri_mags+self.dtri_mags/2)[q], np.log10(self.gal_dns[q]) + np.log10(self.gal_corr), 'k:') ax.set_ylim(*lims) ax.set_xlabel(f'{self.mag_names[self.unc_index]} / mag') if usetex: ax.set_ylabel(r'log$_{10}\left(\mathrm{D}\ /\ \mathrm{mag}^{-1}\,' r'\mathrm{deg}^{-2}\right)$') else: ax.set_ylabel(r'log10(D / mag^-1 deg^-2)') plt.figure('123123') plt.tight_layout() plt.savefig(f'{self.save_folder}/pdf/{self.magnitude_append}counts_comparison_{self.file_name}.pdf') plt.close()
[docs] def calculate_local_densities_and_nearest_neighbours(self): """ Calculate local normalising catalogue densities and catalogue-catalogue nearest neighbour match pairings for each cutout region. """ t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f'{t}, {self.file_name}: Creating local densities and nearest neighbour matches...') sys.stdout.flush() p_ind = self.unc_index if self.use_photometric_uncertainties else self.correct_astro_mag_indices_index narray = create_densities( self.b, self.minmag, self.maxmag, self.hull_points, self.hull_x_shift, self.dens_search_radius, self.n_pool, self.mag_indices[p_ind], self.pos_and_err_indices[0][0], self.pos_and_err_indices[0][1], self.coord_system, self.file_name) if self.single_or_repeat == 'repeat': # Divide the counts through by the number of repeat visits. n_visits = len(np.unique(self.repeat_unique_visits)) narray = narray / n_visits if self.single_or_repeat == 'single': _, bmatch, dists = create_distances( self.a, self.b, self.nn_radius, self.pos_and_err_indices[1][0], self.pos_and_err_indices[1][1], self.pos_and_err_indices[0][0], self.pos_and_err_indices[0][1], self.coord_system) else: bmatch, dists = [], [] for v in np.unique(self.repeat_unique_visits): q = self.repeat_unique_visits == v _, _bm, _d = create_distances( self.a, self.b[q], self.nn_radius, self.pos_and_err_indices[1][0], self.pos_and_err_indices[1][1], self.pos_and_err_indices[1][0], self.pos_and_err_indices[0][1], self.coord_system) # The _bm array returned is [0, len(self.b[q])] indexed, so we # need to convert to indices back into self.b. b_match_large = np.arange(len(self.b))[q] if len(bmatch) == 0: bmatch = b_match_large[_bm] dists = _d else: bmatch = np.concatenate((bmatch, b_match_large[_bm])) dists = np.concatenate((dists, _d)) # pylint: disable-next=fixme # TODO: extend to 3-D search around N-m-sig to find as many good # enough bins as possible, instead of only keeping one N-sig bin # per magnitude? _h, _b = np.histogram(narray[bmatch], bins='auto') moden = (_b[:-1]+np.diff(_b)/2)[np.argmax(_h)] dn = 0.05*moden self.narray, self.dists, self.bmatch = narray, dists, bmatch self.moden, self.dn = moden, dn
[docs] def simulate_aufs(self): """ Simulate unresolved blended contaminants for each magnitude-sightline combination, for both aperture photometry and background-dominated PSF algorithms. """ t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f'{t}, {self.file_name}: Creating AUF simulations...') sys.stdout.flush() b_ratio = 0.05 p_ind = self.unc_index if self.use_photometric_uncertainties else self.correct_astro_mag_indices_index _snr = self.b[:, self.snr_indices[p_ind]] _mag = self.b[:, self.mag_indices[p_ind]] p = ((_snr > 0) & ~np.isnan(_snr) & np.isfinite(_snr) & (_mag > 0) & ~np.isnan(_mag) & np.isfinite(_mag)) snr, _, _ = binned_statistic(_mag[p], _snr[p], statistic='median', bins=np.append(self.mag_array-self.mag_slice, self.mag_array[-1]+self.mag_slice[-1])) dm_max = _calculate_magnitude_offsets(b_ratio, snr) seed = np.random.default_rng().choice(100000, size=(mff.get_random_seed_size(), len(self.mag_array))) _, _, four_off_fw, _, _ = \ paf.perturb_aufs( self.moden*np.ones_like(self.mag_array), self.mag_array, self.r[:-1]+self.dr/2, self.dr, self.r, self.j0s.T, self.tri_mags+self.dtri_mags/2, self.dtri_mags, self.log10y, self.n_norm, (dm_max/self.dm).astype(int), self.dmcut, self.psf_radius, self.psfsig, self.numtrials, seed, self.dd_params, self.l_cut, 'fw') seed = np.random.default_rng().choice(100000, size=(mff.get_random_seed_size(), len(self.mag_array))) _, _, four_off_ps, _, _ = \ paf.perturb_aufs( self.moden*np.ones_like(self.mag_array), self.mag_array, self.r[:-1]+self.dr/2, self.dr, self.r, self.j0s.T, self.tri_mags+self.dtri_mags/2, self.dtri_mags, self.log10y, self.n_norm, (dm_max/self.dm).astype(int), self.dmcut, self.psf_radius, self.psfsig, self.numtrials, seed, self.dd_params, self.l_cut, 'psf') self.four_off_fw, self.four_off_ps = four_off_fw, four_off_ps
[docs] def create_auf_pdfs(self, min_hist_cut=100): """ Using perturbation AUF simulations, generate probability density functions of perturbation distance for all cutout regions, as well as recording key statistics such as average magnitude or SNR. Parameters ---------- min_hist_cut : integer, optional Number of data points in each magnitude-uncertainty slice to be considered for fitting for scaling relations. """ t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f'{t}, {self.file_name}: Creating catalogue AUF probability densities...') sys.stdout.flush() b_matches = self.b[self.bmatch] skip_flags = np.zeros_like(self.mag_array, dtype=bool) pdfs, pdf_uncerts, q_pdfs, pdf_bins = [], [], [], [] nums = [] avg_sig = np.empty((len(self.mag_array), 3), float) avg_snr = np.empty((len(self.mag_array), 3), float) avg_mag = np.empty((len(self.mag_array), 3), float) dist_stds = np.empty(len(self.mag_array), float) p_ind = self.unc_index if self.use_photometric_uncertainties else self.correct_astro_mag_indices_index mag_ind = self.mag_indices[p_ind] for i, mag in enumerate(self.mag_array): mag_cut = ((b_matches[:, mag_ind] <= mag+self.mag_slice[i]) & (b_matches[:, mag_ind] >= mag-self.mag_slice[i])) if np.sum(mag_cut) == 0: skip_flags[i] = 1 pdfs.append([-1]) pdf_uncerts.append([-1]) q_pdfs.append([-1]) pdf_bins.append([-1]) nums.append([-1]) continue sig = np.percentile(b_matches[mag_cut, self.pos_and_err_indices[0][2]], 50) sig_cut = ((b_matches[:, self.pos_and_err_indices[0][2]] <= sig+self.sig_slice[i]) & (b_matches[:, self.pos_and_err_indices[0][2]] >= sig-self.sig_slice[i])) _h, _b = np.histogram(self.narray[self.bmatch][mag_cut & sig_cut], bins='auto') moden = (_b[:-1]+np.diff(_b)/2)[np.argmax(_h)] dn = 0.05*moden n_cut = (self.narray[self.bmatch] >= moden-dn) & (self.narray[self.bmatch] <= moden+dn) # Since we expect the astrometric/photometric scaling to be roughly # a factor FWHM/(2 * sqrt(2 * ln(2))), i.e. the sigma of a # Gaussian-ish PSF, scale the "20-sigma" cut accordingly: if not self.use_photometric_uncertainties: final_slice = sig_cut & mag_cut & n_cut & (self.dists <= 20*sig) else: # Compare the photometric-error-based astrometric uncertainty for # a 20-sigma cut, but in cases where this is disconnected from # the astrometry we floor this at the median distance of the # first three cuts. if np.sum(sig_cut & mag_cut & n_cut) > 0: med_dist = np.median(self.dists[sig_cut & mag_cut & n_cut]) compare_sig = max(self.psfsig*sig, med_dist) final_slice = sig_cut & mag_cut & n_cut & (self.dists <= 20*compare_sig) else: final_slice = sig_cut & mag_cut & n_cut final_dists = self.dists[final_slice] if len(final_dists) < min_hist_cut: skip_flags[i] = 1 pdfs.append([-1]) pdf_uncerts.append([-1]) q_pdfs.append([-1]) pdf_bins.append([-1]) nums.append([-1]) continue bm = b_matches[final_slice] p = ((bm[:, self.snr_indices[p_ind]] > 0) & ~np.isnan(bm[:, self.snr_indices[p_ind]]) & np.isfinite(bm[:, self.snr_indices[p_ind]])) snr = bm[p, self.snr_indices[p_ind]] avg_snr[i, 0] = np.median(snr) avg_snr[i, [1, 2]] = np.abs(np.percentile(snr, [16, 84]) - np.median(snr)) avg_mag[i, 0] = np.median(bm[:, mag_ind]) avg_mag[i, [1, 2]] = np.abs(np.percentile(bm[:, mag_ind], [16, 84]) - np.median(bm[:, mag_ind])) avg_sig[i, 0] = np.median(bm[:, self.pos_and_err_indices[0][2]]) avg_sig[i, [1, 2]] = np.abs(np.percentile(bm[:, self.pos_and_err_indices[0][2]], [16, 84]) - np.median(bm[:, self.pos_and_err_indices[0][2]])) dist_stds[i] = np.std(final_dists) h, bins = np.histogram(final_dists, bins='auto') num = np.sum(h) pdf = h / np.diff(bins) / num pdf_uncert = np.sqrt(h) / np.diff(bins) / num # To avoid binned_statistic NaNing when the data are beyond the # edge of the model, we want to limit our bins to the maximum r value. q_pdf = bins[1:] <= self.r[-1] pdfs.append(pdf) pdf_uncerts.append(pdf_uncert) q_pdfs.append(q_pdf) pdf_bins.append(bins) nums.append(num) self.nums = nums self.avg_snr, self.avg_mag, self.avg_sig = avg_snr, avg_mag, avg_sig self.pdfs, self.pdf_uncerts = pdfs, pdf_uncerts self.q_pdfs, self.pdf_bins = q_pdfs, pdf_bins self.skip_flags = skip_flags self.dist_stds = dist_stds
[docs] def fit_uncertainty(self): """ For all magnitudes for eachsightline, fit for the empirical centroid uncertainty relationship that describes the distribution of match separations. Returns ------- m : float The multiplicative factor for converting old uncertainties to new astrometric precision. n : float Quadratically added constant uncertainty to add to scaling between input and output uncertainties. """ t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f'{t}, {self.file_name}: Creating joint H/sig fits...') sys.stdout.flush() self.fit_sigs = np.zeros((len(self.mag_array), 2), float) res = minimize(self.fit_auf_joint, x0=[1, 0.001], method='L-BFGS-B', options={'ftol': 1e-9}, bounds=[(0, None), (0, None)]) self.res = res m, n = res.x for i, pdf in enumerate(self.pdfs): if pdf[0] == -1: self.skip_flags[i] = 1 continue sig_orig = self.avg_sig[i, 0] if self.mn_fit_type == 'quadratic': new_sig = np.sqrt((m * sig_orig)**2 + n**2) else: new_sig = m * sig_orig + n self.fit_sigs[i, 0] = new_sig y, q, bins, snr, num = pdf, self.q_pdfs[i], self.pdf_bins[i], self.avg_snr[i, 0], self.nums[i] sig_est = self.dist_stds[i] / 0.655 res = minimize(self.calc_single_joint_auf, x0=[sig_est], args=(i, bins, y, q, num, snr), method='L-BFGS-B', options={'ftol': 1e-9}) # , bounds=[(0, None)]) self.fit_sigs[i, 1] = np.abs(res.x[0]) return m, n
[docs] def fit_auf_joint(self, p): """ Fits all magnitude slices for a single sightline, modelling empirically derived centroid uncertainties in all cases in addition to fixed uncertainty contributions. Parameters ---------- p : list List containing the scaling values to be fit for. Returns ------- neg_log_like : float Negative-log-likelihood of the fit, to be minimised in the wrapping function call. """ m, n = p neg_log_like = 0 for i, pdf in enumerate(self.pdfs): if self.pdfs[i][0] == -1: continue (y, q, bins, sig, snr, num) = (pdf, self.q_pdfs[i], self.pdf_bins[i], self.avg_sig[i, 0], self.avg_snr[i, 0], self.nums[i]) if self.mn_fit_type == 'quadratic': o = np.sqrt((m * sig)**2 + n**2) else: o = m * sig + n neg_log_like += self.calc_single_joint_auf(o, i, bins, y, q, num, snr) return neg_log_like
[docs] def calc_single_joint_auf(self, o, i, bins, y, q, num, snr): """ Calculates the negative-log-likelihood of a single magnitude slice of match separations as fit with an AUF. Parameters ---------- o : float The Gaussian uncertainty of the centroid component of the AUF. i : int Index to access particular magnitude slice. bins : numpy.ndarray Array of floats, bin edges of histogram of cross-match distances. y : numpy.ndarray Array of floats, counts in each bin of ``bins`` of cross-match distances. q : numpy.ndarray Array of booleans, flags for whether to use each ``y`` bin or not. num : int Total number of counts within each PDF in ``y``, to convert back to raw counts for Poisson counting statistics. snr: numpy.ndarray Array of signal-to-noise ratios, for weighting between different algorithms for the Perturbation component of the AUF. Returns ------- neg_log_like_part : numpy.ndarray The negative-log-likelihood of dataset of separations given the AUF model. """ p_ind = self.unc_index if self.use_photometric_uncertainties else self.correct_astro_mag_indices_index _snr = self.b[:, self.snr_indices[p_ind]] p = ((_snr > 0) & ~np.isnan(_snr) & np.isfinite(_snr)) n_sources_inv_max_snr = 1000 inv_max_snr = 1 / np.percentile(_snr[p][np.argsort(_snr[p])][n_sources_inv_max_snr:], 50) h = 1 - np.sqrt(1 - min(1, inv_max_snr**2 * snr**2)) four_combined = h * self.four_off_fw[:, i] + (1 - h) * self.four_off_ps[:, i] four_gauss = np.exp(-2 * np.pi**2 * (self.rho[:-1]+self.drho/2)**2 * o**2) convolve_hist = paf.fourier_transform( four_combined*four_gauss, self.rho[:-1]+self.drho/2, self.drho, self.j0s) convolve_hist_dr = convolve_hist * np.pi * (self.r[1:]**2 - self.r[:-1]**2) / self.dr # We have two components to fit to our data: first, some fraction F # of the objects do not have a counterpart and therefore follow # a nearest-neighbour distribution, n(r), while 1-F of the objects follow # an NN-contaminated counterpart distribution, m(r). # Here n(r) = 2 pi r rho exp(-pi r^2 rho) and # N(r) = \int_0^r n(r') dr' = 1 - exp(-pi r^2 rho). # We then have to consider two contributions to our match distribution; # the match could either be a counterpart if there's no NN inside of it, # or an NN if there's no counterpart inside of it, and hence # m(r) = c(r)[1 - N(r)] + n(r)[1 - C(r)], with c(r) the empirical # distribution of counterparts and C(r) = \int_0^r c(r') dr'. # Finally, the combination y(r) = F n(r) + (1 - F) m(r). avg_a_dens = len(self.a) / self.area # For a symmetric nearest neighbour distribution we need to use the # combined density of sources -- this is derived from consideration # of the probability of no NN object inside r being the chance that # NEITHER source has an asymmetric nearest neighbour inside r, or # F(r) = (1 - \int a(r) dr) * (1 - \int b(r) dr), where the integrals # of a and b, being of the form 2 pi r N exp(-pi r^2 N) are # 1 - exp(-pi r^2 N), and hence the survival integrals multiplied # together form N' = N_a + N_b, and then you use f(r) = dF/dr. density = (self.moden + avg_a_dens) / 3600**2 nn_model = 2 * np.pi * (self.r[:-1]+self.dr/2) * density * np.exp( -np.pi * (self.r[:-1]+self.dr/2)**2 * density) m_conv_plus_nn = (convolve_hist_dr * (np.exp(-np.pi * (self.r[:-1]+self.dr/2)**2 * density)) + nn_model * (1 - np.cumsum(convolve_hist_dr * self.dr))) reduced_nn_model, _, _ = binned_statistic(self.r[:-1]+self.dr/2, nn_model, bins=bins) reduced_m_conv_plus_nn, _, _ = binned_statistic(self.r[:-1]+self.dr/2, m_conv_plus_nn, bins=bins) # Empty binned_statistic bins return as NaNs. _q = q & ~np.isnan(reduced_nn_model) & ~np.isnan(reduced_m_conv_plus_nn) # For the subset of data in the single magnitude slice we need # a nearest neighbour fraction, which we can fit for on-the-fly # for each slice separately (for the same m/n). _nll = 1e10 nnf = -1 k = y[_q] * np.diff(bins)[_q] * num # Ramanujan, The Lost Notebook and other Unpublished Papers, gives # an approximation for ln(n!) as n ln(n) - n + 1/6 ln(8 n^3 + # 4 n^2 + n + 1/30) + ln(pi)/2. We use this, to avoid overflowing # n!, above n=100 log_fac_k = np.empty(len(k), float) log_fac_k[k < 100] = np.log(factorial(k[k < 100])) log_fac_k[k >= 100] = k[k >= 100] * np.log(k[k >= 100]) - k[k >= 100] + 1/6 * np.log( 8 * k[k >= 100]**3 + 4 * k[k >= 100]**2 + k[k >= 100] + 1/30) + np.log(np.pi)/2 for _nnf in np.linspace(0, 1, 101): modely = _nnf * reduced_nn_model + (1 - _nnf) * reduced_m_conv_plus_nn # Poisson is exp(-L) L**k / k!, so negative log-likelihood is # L - k*ln(L) + ln(k!). Have to convert from PDF to counts through # bin width and number of objects, forcing a non-zero rate to avoid # logarithmic issues. _l = modely[_q] * np.diff(bins)[_q] * num _l[_l <= 1e-10] = 1e-10 temp_neg_log_like = np.sum(_l - k*np.log(_l) + log_fac_k) if temp_neg_log_like < _nll: nnf = _nnf _nll = temp_neg_log_like modely = nnf * reduced_nn_model + (1 - nnf) * reduced_m_conv_plus_nn k = y[_q] * np.diff(bins)[_q] * num _l = modely[_q] * np.diff(bins)[_q] * num _l[_l <= 1e-10] = 1e-10 neg_log_like_part = np.sum(_l - k*np.log(_l) + log_fac_k) return neg_log_like_part
[docs] def plot_fits_calculate_cdfs(self): # pylint: disable=too-many-locals,too-many-statements """ Calculate poisson CDFs and create verification plots showing the quality of the fits. """ mn_poisson_cdfs = np.array([], float) ind_poisson_cdfs = np.array([], float) t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") if self.make_plots: print(f'{t}, {self.file_name}: Creating individual AUF figures and calculating ' 'goodness-of-fits...') else: print(f'{t}, {self.file_name}: Calculating goodness-of-fits...') sys.stdout.flush() if self.make_plots: # Grid just big enough square to cover mag_array entries. gs1 = self.make_gridspec('34242b', self.n_mag_rows, self.n_mag_cols, 0.8, 5) ax1s = [plt.subplot(gs1[i]) for i in range(len(self.mag_array))] b_matches = self.b[self.bmatch] p_ind = self.unc_index if self.use_photometric_uncertainties else self.correct_astro_mag_indices_index for i, mag in enumerate(self.mag_array): if self.skip_flags[i]: continue if self.make_plots: ax = ax1s[i] pdf, pdf_uncert, q_pdf, pdf_bin, num_pdf = ( self.pdfs[i], self.pdf_uncerts[i], self.q_pdfs[i], self.pdf_bins[i], self.nums[i]) if self.make_plots: ax.errorbar((pdf_bin[:-1]+np.diff(pdf_bin)/2)[q_pdf], pdf[q_pdf], yerr=pdf_uncert[q_pdf], c='k', marker='.', zorder=1, ls='None') mag_ind = self.mag_indices[p_ind] pos_err_ind = self.pos_and_err_indices[0][2] mag_cut = ((b_matches[:, mag_ind] <= mag+self.mag_slice[i]) & (b_matches[:, mag_ind] >= mag-self.mag_slice[i])) bsig = np.percentile(b_matches[mag_cut, pos_err_ind], 50) sig_cut = ((b_matches[:, pos_err_ind] <= bsig+self.sig_slice[i]) & (b_matches[:, pos_err_ind] >= bsig-self.sig_slice[i])) _h, _b = np.histogram(self.narray[self.bmatch][mag_cut & sig_cut], bins='auto') moden = (_b[:-1]+np.diff(_b)/2)[np.argmax(_h)] dn = 0.05*moden n_cut = (self.narray[self.bmatch] >= moden-dn) & (self.narray[self.bmatch] <= moden+dn) if not self.use_photometric_uncertainties: final_slice = sig_cut & mag_cut & n_cut & (self.dists <= 20*bsig) else: # Compare the photometric-error-based astrometric uncertainty for # a 20-sigma cut, but in cases where this is disconnected from # the astrometry we floor this at the median distance of the # first three cuts. if np.sum(sig_cut & mag_cut & n_cut) > 0: med_dist = np.median(self.dists[sig_cut & mag_cut & n_cut]) compare_sig = max(self.psfsig*bsig, med_dist) final_slice = sig_cut & mag_cut & n_cut & (self.dists <= 20*compare_sig) else: final_slice = sig_cut & mag_cut & n_cut avg_a_dens = len(self.a) / self.area density = (np.percentile(self.narray[self.bmatch][final_slice], 50) + avg_a_dens) / 3600**2 nn_model = 2 * np.pi * (self.r[:-1]+self.dr/2) * density * np.exp( -np.pi * (self.r[:-1]+self.dr/2)**2 * density) fit_sig = self.fit_sigs[i, 0] _snr = self.b[:, self.snr_indices[p_ind]] p = ((_snr > 0) & ~np.isnan(_snr) & np.isfinite(_snr)) n_sources_inv_max_snr = 1000 inv_max_snr = 1 / np.percentile(_snr[p][np.argsort(_snr[p])][n_sources_inv_max_snr:], 50) h = 1 - np.sqrt(1 - min(1, inv_max_snr**2 * self.avg_snr[i, 0]**2)) ind_fit_sig = self.fit_sigs[i, 1] if self.make_plots: ax = ax1s[i] _quoted_sig = (self.avg_sig[i, 0] if not self.use_photometric_uncertainties else self.psf_fwhm * self.avg_sig[i, 0]) for j, (sig, _h, ls) in enumerate(zip( [fit_sig, fit_sig, fit_sig, _quoted_sig, _quoted_sig, _quoted_sig, ind_fit_sig, ind_fit_sig, ind_fit_sig], [h, 1, 0, h, 1, 0, h, 1, 0], ['r-', 'r-.', 'r:', 'k-', 'k-.', 'k:', 'c-', 'c-.', 'c:'])): if not self.make_plots and j != 0: continue four_gauss = np.exp(-2 * np.pi**2 * (self.rho[:-1]+self.drho/2)**2 * sig**2) four_hist = _h * self.four_off_fw[:, i] + (1 - _h) * self.four_off_ps[:, i] convolve_hist = paf.fourier_transform( four_hist*four_gauss, self.rho[:-1]+self.drho/2, self.drho, self.j0s) convolve_hist_dr = convolve_hist * np.pi * ( self.r[1:]**2 - self.r[:-1]**2) / self.dr m_conv_plus_nn = (convolve_hist_dr * (np.exp(-np.pi * (self.r[:-1]+self.dr/2)**2 * density)) + nn_model * (1 - np.cumsum(convolve_hist_dr * self.dr))) reduced_nn_model, _, _ = binned_statistic(self.r[:-1]+self.dr/2, nn_model, bins=pdf_bin) reduced_m_conv_plus_nn, _, _ = binned_statistic(self.r[:-1]+self.dr/2, m_conv_plus_nn, bins=pdf_bin) if j in [0, 3, 6]: _nll = 1e10 nn_frac = -1 k = pdf[q_pdf] * np.diff(pdf_bin)[q_pdf] * num_pdf log_fac_k = np.empty(len(k), float) log_fac_k[k < 100] = np.log(factorial(k[k < 100])) log_fac_k[k >= 100] = k[k >= 100] * np.log(k[k >= 100]) - k[k >= 100] + 1/6 * np.log( 8 * k[k >= 100]**3 + 4 * k[k >= 100]**2 + k[k >= 100] + 1/30) + np.log(np.pi)/2 for _nnf in np.linspace(0, 1, 101): modely = _nnf * reduced_nn_model + (1 - _nnf) * reduced_m_conv_plus_nn _l = modely[q_pdf] * np.diff(pdf_bin)[q_pdf] * num_pdf _l[_l <= 1e-10] = 1e-10 temp_neg_log_like = np.sum(_l - k*np.log(_l) + log_fac_k) if temp_neg_log_like < _nll: nn_frac = _nnf _nll = temp_neg_log_like if j == 0: nn_frac_mn = nn_frac if j == 3: nn_frac_quot = nn_frac if j == 6: nn_frac_ind = nn_frac modely = nn_frac * reduced_nn_model + (1 - nn_frac) * reduced_m_conv_plus_nn if j in [0, 3, 6]: _l = modely[q_pdf] * np.diff(pdf_bin)[q_pdf] * num_pdf _l[_l <= 1e-10] = 1e-10 p_cdf = np.empty(len(_l), float) for _i in range(len(_l)): # pylint: disable=consider-using-enumerate p_cdf[_i] = poisson.cdf(k=k[_i], mu=_l[_i]) if j == 0: mn_poisson_cdfs = np.append(mn_poisson_cdfs, p_cdf) if j == 6: ind_poisson_cdfs = np.append(ind_poisson_cdfs, p_cdf) if self.make_plots: if j in [0, 3, 6]: sig_type = 'fit' if j == 0 else 'quoted' if j == 3 else 'ind' sig_val = fit_sig if j == 0 else _quoted_sig if j == 3 else ind_fit_sig f_val = nn_frac_mn if j == 0 else nn_frac_quot if j == 3 else nn_frac_ind h_str = f', H = {h:.2f}' if j == 0 else '' if usetex: lab = rf'$\sigma_\mathrm{{{sig_type}}}$ = {sig_val:.4f}", F = {f_val:.2f}{h_str}' else: lab = rf'sigma_{sig_type} = {sig_val:.4f}", F = {f_val:.2f}{h_str}' else: lab = '' # Have to make a new "q" filter variable here, since we want # to use the full-resolution model rather than a binned one # for calculating our normalisation. model_q = self.r[:-1] < pdf_bin[1:][pdf > 0][-1] full_modely = nn_frac * nn_model + (1 - nn_frac) * m_conv_plus_nn modely_norm = np.sum(full_modely[model_q] * self.dr[model_q]) ax.plot((pdf_bin[:-1]+np.diff(pdf_bin)/2)[q_pdf], modely[q_pdf] / modely_norm, ls, label=lab) if self.make_plots: ax.legend(fontsize=8) ax.set_xlabel('Radius / arcsecond') if usetex: ax.set_ylabel('PDF / arcsecond$^{-1}$') else: ax.set_ylabel('PDF / arcsecond^-1') if self.make_plots: plt.tight_layout() plt.savefig(f'{self.save_folder}/pdf/{self.magnitude_append}auf_fits_{self.file_name}.pdf') plt.close() return mn_poisson_cdfs, ind_poisson_cdfs
[docs] def plot_snr_mag_sig(self): # pylint: disable=too-many-statements """ Generate 2-D histograms of SNR, quoted/fit astrometric uncertainty, and photometric magnitude. """ t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"{t}, {self.file_name}: Plotting SNR-Magnitude-Uncertainty scaling relations...") sys.stdout.flush() p_ind = self.unc_index if self.use_photometric_uncertainties else self.correct_astro_mag_indices_index p = ((self.b[:, self.snr_indices[p_ind]] > 0) & ~np.isnan(self.b[:, self.snr_indices[p_ind]]) & np.isfinite(self.b[:, self.snr_indices[p_ind]])) _snr = self.b[p, self.snr_indices[p_ind]] obj_err = self.b[p, self.pos_and_err_indices[0][2]] obj_mag = self.b[p, self.mag_indices[p_ind]] gs = self.make_gridspec('sig_vs_snr_vs_mag', 1, 3, 0.8, 6) mag_label = f' ({self.mag_names[self.unc_index]})' if self.use_photometric_uncertainties else '' ax = plt.subplot(gs[0]) q = (obj_err < 1) & (_snr > 1) & (obj_err > 0) log_inv_snr = np.log10(1 / _snr[q]) log_err = np.log10(obj_err[q]) h, x, y = np.histogram2d(log_inv_snr, log_err, bins=(100, 101)) ax.pcolormesh(x, y, h.T, edgecolors='face', cmap='viridis', rasterized=True) ylims = ax.get_ylim() xlims = ax.get_xlim() if len(self.seeing_ranges) == 1: ls_list = ['k-'] elif len(self.seeing_ranges) == 2: ls_list = ['r--', 'k-'] elif len(self.seeing_ranges) == 3: ls_list = ['r--', 'k-', 'r-'] if not self.use_photometric_uncertainties: # pylint: disable-next=possibly-used-before-assignment for seeing, ls in zip(self.seeing_ranges, ls_list): log_ks = np.log10(seeing / (2 * np.sqrt(2 * np.log(2)))) ax.plot(x, log_ks + x, ls, label=rf'seeing = {seeing:.1f} arcsec') else: ax.plot(x, x, 'k-') q = ~self.skip_flags if np.sum(q) > 0: man_snr = self.avg_snr[q, 0] man_sig_quoted = (self.avg_sig[q, 0] if not self.use_photometric_uncertainties else self.psf_fwhm * self.avg_sig[q, 0]) # Here we want the second column of fit_sigs, the individual # derivations of astrometric uncertainty. man_sig_fit = self.fit_sigs[q, 1] man_log_inv_snr = np.log10(1/man_snr) man_log_err_fit = np.log10(man_sig_fit) man_log_err_quoted = np.log10(man_sig_quoted) for _index in range(len(man_log_inv_snr)): # pylint: disable=consider-using-enumerate ax.arrow(man_log_inv_snr[_index], man_log_err_quoted[_index], 0, man_log_err_fit[_index]-man_log_err_quoted[_index], color='k', zorder=30, head_width=0.06, head_length=0.03) ax.set_xlim(*xlims) ax.set_ylim(*ylims) if not self.use_photometric_uncertainties: ax.legend(fontsize=10) if usetex: ax.set_xlabel(r'$\log_{10}$(1 / SNR)') else: ax.set_xlabel(r'log10(1 / SNR)') if usetex: ax.set_ylabel(rf'$\log_{10}$(Quoted{mag_label} uncertainty / arcsecond)') else: ax.set_ylabel(rf'log10(Quoted{mag_label} uncertainty / arcsecond)') ax = plt.subplot(gs[1]) q = (obj_err < 1) & (_snr > 1) & (obj_err > 0) & ~np.isnan(obj_mag) log_inv_snr = np.log10(1 / _snr[q]) h, x, y = np.histogram2d(log_inv_snr, obj_mag[q], bins=(100, 101)) ax.pcolormesh(x, y, h.T, edgecolors='face', cmap='viridis', rasterized=True) ylims = ax.get_ylim() xlims = ax.get_xlim() q = ~self.skip_flags if np.sum(q) > 0: man_snr = self.avg_snr[q, 0] man_mag = self.mag_array[q] man_log_inv_snr = np.log10(1/man_snr) ax.plot(man_log_inv_snr, man_mag, 'kx') # m = -2.5 log10(flux), SNR = flux/B, thus # m = -2.5 log10(B) - 2.5 log10(SNR), so # m = -2.5 log10(B) + 2.5 x. # Pin x and y (m) to the last data point for scaling. _i = np.argmax(man_mag) log_B = (man_mag[_i] - (2.5 * man_log_inv_snr[_i])) / (-2.5) # pylint: disable=invalid-name ax.plot(x, 2.5 * x - 2.5 * log_B, 'k-', label='Background-dominated') # Now SNR = sqrt(flux), if flux_err = sqrt(flux). # Thus m = -2.5 log10(SNR^2) = -5 log10(SNR) = 5 x. # This also needs some kind of + C since we don't zeropoint, though. _j = np.argmin(man_mag) C = man_mag[_j] - (5 * man_log_inv_snr[_j]) ax.plot(x, 5 * x + C, 'r--', label='Photon-limited') ax.set_xlim(*xlims) ax.set_ylim(*ylims) if np.sum(q) > 0: ax.legend(fontsize=10) if usetex: ax.set_xlabel(r'$\log_{10}$(1 / SNR)') else: ax.set_xlabel(r'log10(1 / SNR)') ax.set_ylabel(f'{self.mag_names[p_ind]} / mag') ax = plt.subplot(gs[2]) q = (obj_err < 1) & (_snr > 1) & (obj_err > 0) & ~np.isnan(obj_mag) log_err = np.log10(obj_err[q]) h, x, y = np.histogram2d(obj_mag[q], log_err, bins=(100, 101)) ax.pcolormesh(x, y, h.T, edgecolors='face', cmap='viridis', rasterized=True) ylims = ax.get_ylim() xlims = ax.get_xlim() q = ~self.skip_flags if np.sum(q) > 0: man_mag = self.mag_array[q] man_sig_quoted = (self.avg_sig[q, 0] if not self.use_photometric_uncertainties else self.psf_fwhm * self.avg_sig[q, 0]) # Remember, individually fit not parameterisation. man_sig_fit = self.fit_sigs[q, 1] man_log_err_fit = np.log10(man_sig_fit) man_log_err_quoted = np.log10(man_sig_quoted) for _index in range(len(man_mag)): # pylint: disable=consider-using-enumerate ax.arrow(man_mag[_index], man_log_err_quoted[_index], 0, man_log_err_fit[_index]-man_log_err_quoted[_index], color='k', zorder=30, head_width=0.06, head_length=0.03) ax.set_xlim(*xlims) ax.set_ylim(*ylims) ax.set_xlabel(f'{self.mag_names[p_ind]} / mag') if usetex: ax.set_ylabel(rf'$\log_{10}$(Quoted{mag_label} uncertainty / arcsecond)') else: ax.set_ylabel(rf'log10(Quoted{mag_label} uncertainty / arcsecond)') plt.tight_layout() plt.savefig(f'{self.save_folder}/pdf/{self.magnitude_append}' f'histogram_mag_vs_sig_vs_snr_{self.file_name}.pdf') plt.close()
[docs] def finalise_summary_plots(self): """ After running all of the sightlines' fits, generate final summary plots of the sig-sig relations, quality of fits, and resulting "m" and "n" scaling parameters. """ loop_len = len(self.mag_names) if self.use_photometric_uncertainties else 1 plt.figure('single_sigsig') for unc_index in range(loop_len): x_array = np.linspace(0, self.ax_b_sing[unc_index].get_xlim()[1], 1000) self.ax_b_sing[unc_index].plot(x_array, x_array, 'g:') self.ax_b_sing[unc_index].set_ylim(0.95 * self.ylims_sing[0], 1.05 * self.ylims_sing[1]) self.ax_b_log_sing[unc_index].plot(x_array, x_array, 'g:') self.ax_b_log_sing[unc_index].set_xscale('log') self.ax_b_log_sing[unc_index].set_yscale('log') if usetex: if not self.use_photometric_uncertainties: self.ax_b_sing[unc_index].set_xlabel(r'Quoted astrometric $\sigma$ / arcsecond') self.ax_b_log_sing[unc_index].set_xlabel(r'Quoted astrometric $\sigma$ / arcsecond') else: self.ax_b_sing[unc_index].set_xlabel( rf'Quoted photometric {self.mag_names[unc_index]} $\sigma$ / arcsecond') self.ax_b_log_sing[unc_index].set_xlabel( rf'Quoted photometric {self.mag_names[unc_index]} $\sigma$ / arcsecond') self.ax_b_sing[unc_index].set_ylabel(r'Individually-fit astrometric $\sigma$ / arcsecond') self.ax_b_log_sing[unc_index].set_ylabel(r'Individually-fit astrometric $\sigma$ / arcsecond') else: if not self.use_photometric_uncertainties: self.ax_b_sing[unc_index].set_xlabel(r'Quoted astrometric sigma / arcsecond') self.ax_b_log_sing[unc_index].set_xlabel(r'Quoted astrometric sigma / arcsecond') else: self.ax_b_sing[unc_index].set_xlabel( rf'Quoted photometric {self.mag_names[unc_index]} sigma / arcsecond') self.ax_b_log_sing[unc_index].set_xlabel( rf'Quoted photometric {self.mag_names[unc_index]} sigma / arcsecond') self.ax_b_sing[unc_index].set_ylabel(r'Individually-fit astrometric sigma / arcsecond') self.ax_b_log_sing[unc_index].set_ylabel(r'Individually-fit astrometric sigma / arcsecond') plt.savefig(f'{self.save_folder}/pdf/summary_individual_sig_vs_sig.pdf') plt.close() gs = self.make_gridspec('fits_cdf', loop_len, 2, 0.8, 6) for unc_index in range(loop_len): if self.use_photometric_uncertainties: _m, _i = self.mn_poisson_cdfs[:, unc_index], self.ind_poisson_cdfs[:, unc_index] else: _m, _i = self.mn_poisson_cdfs, self.ind_poisson_cdfs for i, (f, label) in enumerate(zip([_m, _i], ['HyperParameter', 'Individual'])): ax_d = plt.subplot(gs[unc_index, i]) for j, g in enumerate(f): if g is None: continue sys.stdout.flush() q = ~np.isnan(g) p_cdf = g[q] # Under the hypothesis all CDFs are "true" we should expect # the distribution of CDFs to be linear with fraction of the way # through the sorted list of CDFs -- that is, the CDF ranked 30% # in order should be ~0.3. q_sort = np.argsort(p_cdf) filter_log_nans = p_cdf[q_sort] < 1 true_hypothesis_cdf_dist = (np.arange(1, len(p_cdf)+1, 1) - 0.5) / len(p_cdf) c = self.cols[j % len(self.cols)] ax_d.plot(np.log10(1 - p_cdf[q_sort][filter_log_nans]), true_hypothesis_cdf_dist[filter_log_nans], ls='None', c=c, marker='.') ax_d.plot(np.log10(1 - true_hypothesis_cdf_dist), true_hypothesis_cdf_dist, 'r--') if usetex: if not self.use_photometric_uncertainties: ax_d.set_xlabel(rf'$\log_{{10}}(1 - \mathrm{{{label} CDF}})$') else: ax_d.set_xlabel(rf'$\log_{{10}}(1 - \mathrm{{{label} CDF}}) ' rf'({self.mag_names[unc_index]})$') else: if not self.use_photometric_uncertainties: ax_d.set_xlabel(rf'log10(1 - {label} CDF)') else: ax_d.set_xlabel(rf'log10(1 - {label} CDF) ({self.mag_names[unc_index]})') ax_d.set_ylabel('Fraction') plt.savefig(f'{self.save_folder}/pdf/summary_mn_ind_cdfs.pdf') plt.close() self.gs_mn_sky = self.make_gridspec('mn_sky', 2*loop_len, 2, 0.8, 5) for unc_index in range(loop_len): if self.use_photometric_uncertainties: _m, _n = self.mn_sigs[:, unc_index, 0], self.mn_sigs[:, unc_index, 1] else: _m, _n = self.mn_sigs[:, 0], self.mn_sigs[:, 1] mag_label = f' ({self.mag_names[unc_index]})' if self.use_photometric_uncertainties else '' for i, (f, label) in enumerate(zip([_m, _n], [f'm{mag_label}', f'n / arcsecond{mag_label}'])): ax = plt.subplot(self.gs_mn_sky[2*unc_index, i]) img = ax.scatter(self.ax1_mids, self.ax2_mids, c=f, cmap='viridis') c = plt.colorbar(img, ax=ax, use_gridspec=True) c.ax.set_ylabel(label) ax1_name = 'l' if self.coord_system == 'galactic' else 'RA' ax2_name = 'b' if self.coord_system == 'galactic' else 'Dec' ax.set_xlabel(f'{ax1_name} / deg') ax.set_ylabel(f'{ax2_name} / deg') xlims = ax.get_xlim() ylims = ax.get_ylim() # Plot the Galactic and Ecliptic planes. for _b in [-20, 0, 20]: l = np.linspace(0, 360, 10000) b = np.zeros_like(l) + _b c = SkyCoord(l=l, b=b, unit='deg', frame='galactic') ra, dec = c.icrs.ra.degree, c.icrs.dec.degree q = np.argsort(ra) ra, dec = ra[q], dec[q] ax.plot(ra, dec, ls='-', c='k' if _b == 0 else 'grey', alpha=0.7, zorder=-1) for _lat in [-20, 0, 20]: lon = np.linspace(0, 360, 10000) lat = np.zeros_like(lon) + _lat c = SkyCoord(lon=lon, lat=lat, unit='deg', frame='barycentricmeanecliptic') ra, dec = c.icrs.ra.degree, c.icrs.dec.degree q = np.argsort(ra) ra, dec = ra[q], dec[q] ax.plot(ra, dec, ls='--', c='k' if _lat == 0 else 'grey', alpha=0.7, zorder=-1) ax.set_xlim(*xlims) ax.set_ylim(*ylims) ax = plt.subplot(self.gs_mn_sky[2*unc_index+1, i]) ax.plot(self.avg_b_dens[:, unc_index] if self.use_photometric_uncertainties else self.avg_b_dens, f, 'k.') if usetex: ax.set_xlabel(rf'Average density / deg$^{-2}${mag_label}') else: ax.set_xlabel(f'Average density / deg^-2{mag_label}') ax.set_ylabel(label) plt.tight_layout() plt.savefig(f'{self.save_folder}/pdf/summary_mn_sky.pdf') plt.close()
[docs] def load_catalogue(self, cat_type, sub_cat_id): """ Load specific sightline's catalogue, accounting for catalogue "a" vs "b", filetype, and method by which sightlines are divided. Parameters ---------- cat_type : string, "a" or "b" Identifier for which of the two catalogues to load. sub_cat_id : list Contains the variables to format the name of the catalogue with, in the sense of ``string.format(x, y, ...)``. Should either contain ax1 and ax2, or chunk ID, depending on ``coord_or_chunk``. Returns ------- x : numpy.ndarray The catalogue's dataset. """ name = (self.a_cat_name.format(*sub_cat_id) if cat_type == 'a' else self.b_cat_name.format(*sub_cat_id)) if self.npy_or_csv == 'npy': x = np.load(name) else: cols = self.a_cols if cat_type == 'a' else self.b_cols x = np.genfromtxt(name, delimiter=',', usecols=cols) return x
def create_distances(a, b, nn_radius, a_ax1_ind, a_ax2_ind, b_ax1_ind, b_ax2_ind, coord_system): """ Calculate nearest neighbour matches between two catalogues. Parameters ---------- a : numpy.ndarray Array containing catalogue "a"'s sources. Must have astrometric coordinates as two of its columns. b : numpy.ndarray Catalogue "b"'s object array. Longitude and latitude must be two columns in the array. nn_radius : float Maximum match radius within which to consider potential counterpart assignments, in arcseconds. a_ax1_ind : integer Index into ``a`` of the longitude data. a_ax2_ind : integer ``a`` index for latitude column. b_ax1_ind : integer Longitude index in the ``b`` dataset. b_ax2_ind : integer Index into ``b`` that holds the latitude data. coord_system : string Sets coordinate system to equatorial or galactic. Returns ------- amatch : numpy.ndarray Indices in catalogue ``a`` that have a match to a source in catalogue ``b`` within ``nn_radius`` that the opposite object has as its nearest neighbour as well. ``amatch[i]`` pairs with ``bmatch[i]`` for all ``i``. bmatch : numpy.ndarray The indices into catalogue ``b`` that have a pair in catalogue ``a``, in an elementwise match between ``amatch`` and ``bmatch``. dists : numpy.ndarray The separations between each nearest neighbour match, in arcseconds. """ if coord_system == 'galactic': ac = SkyCoord(l=a[:, a_ax1_ind], b=a[:, a_ax2_ind], unit='deg', frame='galactic') bc = SkyCoord(l=b[:, b_ax1_ind], b=b[:, b_ax2_ind], unit='deg', frame='galactic') else: ac = SkyCoord(ra=a[:, a_ax1_ind], dec=a[:, a_ax2_ind], unit='deg', frame='icrs') bc = SkyCoord(ra=b[:, b_ax1_ind], dec=b[:, b_ax2_ind], unit='deg', frame='icrs') amatchind, adists, _ = match_coordinates_sky(ac, bc) bmatchind, bdists, _ = match_coordinates_sky(bc, ac) # Since match_coordinates_sky doesn't set a maximum cutoff, we manually # ensure only matches within nn_radius return by setting larger matches # to unique negative indices, so that found_match_slice and # found_match_slice2 fail for those later. q = adists.arcsecond > nn_radius amatchind[q] = -1 q = bdists.arcsecond > nn_radius bmatchind[q] = -2 adists = adists.arcsecond _ainds = np.arange(0, len(a), dtype=int) found_match_slice = amatchind >= 0 found_match_slice2 = _ainds[found_match_slice] == bmatchind[amatchind[found_match_slice]] # The indices should swap here. amatchind is the catalogue b indices # for each catalogue a object, but we really just care about which # catalogue b objects are matched. bmatch = amatchind[found_match_slice][found_match_slice2] amatch = _ainds[found_match_slice][found_match_slice2] dists = adists[found_match_slice][found_match_slice2] return amatch, bmatch, dists