Source code for macauff.parse_catalogue

# Licensed under a 3-clause BSD style license - see LICENSE
'''
This module provides functionality to convert input photometric catalogues
to the required numpy binary files used within the cross-match process.
'''

import os

import numpy as np
import pandas as pd
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.time import Time
from numpy.lib.format import open_memmap

# pylint: disable=import-error,no-name-in-module
from macauff.misc_functions import _load_rectangular_slice
from macauff.misc_functions_fortran import misc_functions_fortran as mff

# pylint: enable=import-error,no-name-in-module

__all__ = ['csv_to_npy', 'rect_slice_npy', 'npy_to_csv', 'rect_slice_csv']


def load_csv(self, which):
    """
    Load one side of the cross-match's .csv catalogue, parsing for whether
    we previously ran the astrometric correction pipeline and need to create
    new uncertainties or not.

    Parameters
    ----------
    which : string
        Either an 'a' or a 'b', for which side of the match is being loaded.
    """
    if not os.path.isfile(getattr(self, f'{which}_cat_csv_file_path')):
        raise FileNotFoundError(f'Catalogue file not found in catalogue "{which}" path. '
                                'Please ensure photometric catalogue is correctly saved.')

    if self.include_perturb_auf or getattr(self, f'{which}_correct_astrometry'):
        snr_cols = getattr(self, f'{which}_snr_indices')
    else:
        snr_cols = None
    if getattr(self, f'{which}_apply_proper_motion'):
        if isinstance(getattr(self, f'{which}_ref_epoch_or_index'), str):
            pm_cols = getattr(self, f'{which}_pm_indices')
            pm_ref_epoch = getattr(self, f'{which}_ref_epoch_or_index')
        else:
            pm_cols = np.append(getattr(self, f'{which}_pm_indices'),
                                getattr(self, f'{which}_ref_epoch_or_index'))
            pm_ref_epoch = None
        pm_move_to_epoch = self.move_to_epoch
    else:
        pm_cols, pm_ref_epoch, pm_move_to_epoch = None, None, None
    if getattr(self, f'{which}_correct_astrometry'):
        x = csv_to_npy(
            getattr(self, f'{which}_cat_csv_file_path'), getattr(self, f'{which}_pos_and_err_indices')[0],
            getattr(self, f'{which}_mag_indices'), getattr(self, f'{which}_best_mag_index_col'),
            getattr(self, f'{which}_chunk_overlap_col'), snr_cols=snr_cols, header=False,
            process_uncerts=True,
            astro_sig_fits_filepath=f'{getattr(self, f"{which}_correct_astro_save_folder")}/npy',
            cat_in_radec=getattr(self, f'{which}_auf_region_frame') == 'equatorial',
            mn_in_radec=getattr(self, f'{which}_auf_region_frame') == 'equatorial', pm_cols=pm_cols,
            pm_ref_epoch=pm_ref_epoch, pm_move_to_epoch=pm_move_to_epoch)
    else:
        x = csv_to_npy(
            getattr(self, f'{which}_cat_csv_file_path'), getattr(self, f'{which}_pos_and_err_indices'),
            getattr(self, f'{which}_mag_indices'), getattr(self, f'{which}_best_mag_index_col'),
            getattr(self, f'{which}_chunk_overlap_col'), snr_cols=snr_cols, header=False, pm_cols=pm_cols,
            pm_ref_epoch=pm_ref_epoch, pm_move_to_epoch=pm_move_to_epoch)
    if snr_cols is not None:
        # pylint: disable-next=unbalanced-tuple-unpacking
        f1, f2, f3, f4, f5 = x
        setattr(self, f'{which}_snr', f5)
    else:
        # pylint: disable-next=unbalanced-tuple-unpacking
        f1, f2, f3, f4 = x
    setattr(self, f'{which}_astro', f1)
    setattr(self, f'{which}_photo', f2)
    setattr(self, f'{which}_magref', f3)
    setattr(self, f'{which}_in_overlaps', f4)


# pylint: disable-next=too-many-branches,too-many-statements,too-many-locals
[docs] def csv_to_npy(input_filename, astro_cols, photo_cols, bestindex_col, chunk_overlap_col, snr_cols=None, header=False, process_uncerts=False, astro_sig_fits_filepath=None, cat_in_radec=None, mn_in_radec=None, pm_cols=None, pm_ref_epoch=None, pm_move_to_epoch=None): ''' Convert a .csv file representation of a photometric catalogue into the appropriate .npy binary files used in the cross-matching process. Parameters ---------- input_filename : string Location on disk, including extension, where the catalogue .csv file is stored that is to be converted into numpy arrays. astro_cols : list or numpy.array of integers List of zero-indexed columns in the input catalogue representing the three required astrometric parameters, two orthogonal sky axis coordinates and a single, circular astrometric precision. The first two columns of ``astro_cols`` must match in order the first two columns of the output astrometry binary file. For cases where ``process_uncerts`` is ``True``, the last ``N`` columns must either be a single astrometric uncertainty, for when catalogue-given astrometric uncertainties are available and were used to parameterise the position-residuals in the catalogue, or a list of ``N`` photometric uncertainty bands, in which case there will be ``N`` band-based parameterisations of the astrometry and the ``bestindex_col`` flags will be used to determine which parameterisation to use for each source individually. photo_cols : list or numpy.ndarray of integers List of zero-indexed columns in the input catalogue representing the magnitudes of each photometric source to be used in the cross-matching. bestindex_col : integer Zero-indexed column of the flag indicating which of the available photometric brightnesses (represented by ``photo_cols``) is the preferred choice -- usually the most precise and highest quality detection. chunk_overlap_col : integer Zero-indexed column in the .csv file containing an integer representation of the boolean representation of whether sources are in the "halo" (``1`` in the .csv) or "core" (``0``) of the region. If ``None`` then all objects are assumed to be in the core. snr_cols : list or numpy.ndarray of integers List of zero-indexed columns in the input catalogue representing the signal-to-noise ratios of each detection corresponding to those same magnitudes in ``photo_cols``. header : boolean, optional Flag indicating whether the .csv file has a first line with the names of the columns in it, or whether the first line of the file is the first line of the dataset. process_uncerts : boolean, optional Determines whether uncertainties are re-processed in light of astrometric fitting on large scales. astro_sig_fits_filepath : string, optional Location on disk of the two saved files that contain the parameters that describe best-fit astrometric precision as a function of quoted astrometric precision. Must be provided if ``process_uncerts`` is ``True``. cat_in_radec : boolean, optional If ``process_uncerts`` is ``True``, must be provided, and either be ``True`` or ``False``, indicating whether the catalogue being processed is in RA-Dec coordinates or not. If ``True``, coordinates of mid-points for derivations of ``m`` and ``n`` for quoted-fit uncertainty relations will be converted from Galactic Longitude and Latitude to Right Ascension and Declination for the purposes of nearest-neighbour use, if ``mn_in_radec`` is ``False`` (and m-n coordinates are in l/b). mn_in_radec : boolean, optional If ``process_uncerts`` is ``True``, must be provided, and similar to ``cat_in_radec`` is a flag indcating whether the coordinates used to compute m-n scaling relations are in RA/Dec or not. If ``mn_in_radec`` disagrees with ``cat_in_radec`` then m-n coordinates will be converted to the coordinate system of the catalogue. pm_cols : list or numpy.ndarray of integers, optional If given, must contain the two orthogonal sky-axis proper motions' indices for the given dataset, to be loaded along with positions, SNRs, photometry, etc. Depending on whether ``pm_ref_epoch`` is also given, must additionally contain the index into the column in the data holding the individial sources' date of observation as the final of three elements. Must be ``None``, as per the default value, to force skipping of element loading. pm_ref_epoch : string, optional If ``pm_cols`` is of length two then this must be given, but if ``pm_cols`` is three elements must not be given. If provided, must be a single string, valid for loading into astropy's Time function. Otherwise, ``pm_cols`` index must contain astropy Time-valid strings in its dataset column. pm_move_to_epoch : string, optional If ``pm_cols`` is provided, regardless of its length, this must be given, and must always contain a single, astropy Time valid, string. Returns ------- astro : numpy.ndarray Three-elements per source, shape ``(N, 3)``, longitude, latitude, and (circular) astrometric uncertainty for every object in the catalogue. photo : numpy.ndarray Photometry of all objects in the catalogue, also length ``N`` in its first axis and then ``M`` photometric bands per object. best_index : numpy.ndarray Indices, ``0``-``M-1``, indicating which of the ``M`` detections is the preferred band for every object. chunk_overlap : numpy.ndarray Boolean flag, indicating whether an object is in the "chunk" or whether it has been included in a halo around the primary chunk objects for match purposes, but is a primary detection in a different chunk of this catalogue. snrs : numpy.ndarray, optional If ``snr_cols`` are provided, also returns the signal-to-noise ratios for each ``photo`` detection for each source. ''' if not (process_uncerts is True or process_uncerts is False): raise ValueError("process_uncerts must either be True or False.") if process_uncerts and astro_sig_fits_filepath is None: raise ValueError("astro_sig_fits_filepath must given if process_uncerts is True.") if process_uncerts and cat_in_radec is None: raise ValueError("cat_in_radec must given if process_uncerts is True.") if process_uncerts and mn_in_radec is None: raise ValueError("mn_in_radec must given if process_uncerts is True.") if process_uncerts and not (cat_in_radec is True or cat_in_radec is False): raise ValueError("If process_uncerts is True, cat_in_radec must either be True or False.") if process_uncerts and not (mn_in_radec is True or mn_in_radec is False): raise ValueError("If process_uncerts is True, mn_in_radec must either be True or False.") if process_uncerts and not os.path.exists(astro_sig_fits_filepath): raise ValueError("process_uncerts is True but astro_sig_fits_filepath does not exist. " "Please ensure file path is correct.") if pm_cols is not None and pm_move_to_epoch is None: raise ValueError("pm_move_to_epoch cannot be None if pm_cols is given.") if pm_cols is not None and (len(pm_cols) < 2 or len(pm_cols) > 3): raise ValueError("pm_cols must either be of length two or three if supplied.") if pm_cols is not None and len(pm_cols) == 2 and pm_ref_epoch is None: raise ValueError("pm_ref_epoch cannot be None if pm_cols is supplied and of length 2.") if pm_cols is not None and len(pm_cols) == 3 and pm_ref_epoch is not None: raise ValueError("If proper motions are to be applied and pm_cols has three elements then " "pm_ref_epoch cannot be given as well.") astro_cols, photo_cols = np.array(astro_cols), np.array(photo_cols) with open(input_filename, encoding='utf-8') as fp: n_rows = 0 if not header else -1 for _ in fp: n_rows += 1 astro = np.empty((n_rows, 3), float) photo = np.empty((n_rows, len(photo_cols)), float) best_index = np.empty(n_rows, int) chunk_overlap = np.empty(n_rows, bool) if snr_cols is not None: snrs = np.empty((n_rows, len(photo_cols)), float) if pm_cols is not None: if len(pm_cols) == 2: pms = np.empty((n_rows, len(pm_cols)), float) else: pms = np.empty((n_rows, len(pm_cols)-1), float) pm_epochs = np.empty(n_rows, object) if process_uncerts: mn_sigs = np.load(f'{astro_sig_fits_filepath}/mn_sigs_array.npy') mn_coords = np.empty((len(mn_sigs), 2), float) # Check the shape of mn_sigs for a third dimension which signals that # we need to perform per-band parameterisation. if len(mn_sigs.shape) == 3: per_band_param = True mn_coords[:, 0] = np.copy(mn_sigs[:, 0, 2]) mn_coords[:, 1] = np.copy(mn_sigs[:, 0, 3]) else: per_band_param = False mn_coords[:, 0] = np.copy(mn_sigs[:, 2]) mn_coords[:, 1] = np.copy(mn_sigs[:, 3]) if cat_in_radec and not mn_in_radec: # Convert mn_coords to RA/Dec if catalogue is in Equatorial coords. a = SkyCoord(l=mn_coords[:, 0], b=mn_coords[:, 1], unit='deg', frame='galactic') mn_coords[:, 0] = a.icrs.ra.degree mn_coords[:, 1] = a.icrs.dec.degree if not cat_in_radec and mn_in_radec: # Convert mn_coords to l/b if catalogue is in Galactic coords. a = SkyCoord(ra=mn_coords[:, 0], dec=mn_coords[:, 1], unit='deg', frame='icrs') mn_coords[:, 0] = a.galactic.l.degree mn_coords[:, 1] = a.galactic.b.degree used_cols = np.concatenate((astro_cols, photo_cols, [bestindex_col])) if chunk_overlap_col is not None: used_cols = np.concatenate((used_cols, [chunk_overlap_col])) if snr_cols is not None: used_cols = np.concatenate((used_cols, snr_cols)) if pm_cols is not None: used_cols = np.concatenate((used_cols, pm_cols)) used_cols = np.sort(used_cols) new_astro_cols = np.array([np.where(used_cols == a)[0][0] for a in astro_cols]) new_photo_cols = np.array([np.where(used_cols == a)[0][0] for a in photo_cols]) new_bestindex_col = np.where(used_cols == bestindex_col)[0][0] if chunk_overlap_col is not None: new_chunk_overlap_col = np.where(used_cols == chunk_overlap_col)[0][0] if snr_cols is not None: new_snr_cols = np.array([np.where(used_cols == a)[0][0] for a in snr_cols]) if pm_cols is not None: new_pm_cols = np.array([np.where(used_cols == a)[0][0] for a in pm_cols]) n = 0 for chunk in pd.read_csv(input_filename, chunksize=100000, usecols=used_cols, header=None if not header else 0): best_index[n:n+chunk.shape[0]] = chunk.values[:, new_bestindex_col] if not process_uncerts: astro[n:n+chunk.shape[0]] = chunk.values[:, new_astro_cols] else: astro[n:n+chunk.shape[0], 0] = chunk.values[:, new_astro_cols[0]] astro[n:n+chunk.shape[0], 1] = chunk.values[:, new_astro_cols[1]] sig_mn_inds = mff.find_nearest_point(chunk.values[:, new_astro_cols[0]], chunk.values[:, new_astro_cols[1]], mn_coords[:, 0], mn_coords[:, 1]) if not per_band_param: # pylint: disable=possibly-used-before-assignment old_sigs = chunk.values[:, new_astro_cols[2]] # pylint: disable-next=possibly-used-before-assignment new_sigs = np.sqrt((mn_sigs[sig_mn_inds, 0]*old_sigs)**2 + mn_sigs[sig_mn_inds, 1]**2) else: new_sigs = np.empty(chunk.shape[0], float) for i in range(chunk.shape[0]): old_sig = chunk.values[i, new_astro_cols[2+best_index[i]]] new_sig = np.sqrt((mn_sigs[sig_mn_inds[i], best_index[i], 0]*old_sig)**2 + mn_sigs[sig_mn_inds[i], best_index[i], 1]**2) new_sigs[i] = new_sig astro[n:n+chunk.shape[0], 2] = new_sigs photo[n:n+chunk.shape[0]] = chunk.values[:, new_photo_cols] if chunk_overlap_col is not None: chunk_overlap[n:n+chunk.shape[0]] = chunk.values[:, new_chunk_overlap_col].astype(bool) else: chunk_overlap[n:n+chunk.shape[0]] = False if snr_cols is not None: snrs[n:n+chunk.shape[0]] = chunk.values[:, new_snr_cols] if pm_cols is not None: if len(pm_cols) == 2: pms[n:n+chunk.shape[0]] = chunk.values[:, new_pm_cols] else: pms[n:n+chunk.shape[0]] = chunk.values[:, new_pm_cols[:-1]] pm_epochs[n:n+chunk.shape[0]] = chunk.values[:, new_pm_cols[-1]] n += chunk.shape[0] if pm_cols is not None: # Since we made the temporary holding place for the individal array # epochs an object dtype, just extract values into a list quickly to # make astropy.time.Time happy later. pm_r_e = pm_ref_epoch if pm_ref_epoch is not None else list(pm_epochs) lon, lat = apply_proper_motion(astro[:, 0], astro[:, 1], pms[:, 0], pms[:, 1], pm_r_e, pm_move_to_epoch, 'equatorial' if cat_in_radec else 'galactic') astro[:, 0], astro[:, 1] = lon, lat if snr_cols is not None: return astro, photo, best_index, chunk_overlap, snrs return astro, photo, best_index, chunk_overlap
# pylint: disable-next=dangerous-default-value,too-many-locals,too-many-statements,dangerous-default-value
[docs] def npy_to_csv(input_csv_file_paths, cm, output_folder, output_filenames, column_name_lists, column_num_lists, extra_col_cat_names, correct_astro_flags, headers=[False, False], extra_col_name_lists=[None, None], extra_col_num_lists=[None, None], file_extension=''): ''' Function to convert output .npy files, as created during the cross-match process, and create a .csv file of matches and non-matches, combining columns from the original .csv catalogues. Parameters ---------- input_csv_file_paths : list of strings List of the locations in which the two respective .csv file catalogues are stored, including filename and extension. cm : Class ``CrossMatch`` class, containing all of the necessary arrays of derived information from a cross-match run to save out to files. output_folder : string Folder into which to save the resulting .csv output files. output_filenames : list of strings List of the names, including extensions, out to which to save merged datasets. column_name_lists : list of list or array of strings List containing two lists of strings, one per catalogue. Each inner list should contain the names of the columns in each respective catalogue to be included in the merged dataset -- its ID or designation, two orthogonal sky positions, and N magnitudes, as were originally used in the matching process, and likely used in ``csv_to_npy``. column_num_lists : list of list or array of integers List containing two lists or arrays of integers, one per catalogue, with the zero-index column integers corresponding to those columns listed in ``column_name_lists``. extra_col_cat_names : list of strings List of two strings, one per catalogue, indicating the names of the two catalogues, to append to the front of the derived contamination-based values included in the output datasets. correct_astro_flags : list of booleans Flags, in the same order as ``input_csv_file_paths``, for whether to save uncorrected and corrected astrometric uncertainties for each catalogue respectively. If a catalogue did not have its uncertainties processed, its entry should be False, so a case where both catalogues in a match had their uncertainties treated as given would be ``[False, False]``. headers : list of booleans, optional List of two boolmean flags, one per catalogue, indicating whether the original input .csv file for this catalogue had a header which provides names for each column on its first line, or whether its first line is the first line of the data. extra_col_name_lists : list of list or array of strings, or None, optional Should be a list of two lists of strings, one per catalogue. As with ``column_name_lists``, these should be names of columns from their respective catalogue in ``csv_filenames``, to be included in the output merged datasets. For a particular catalogue, if no extra columns should be included, put ``None`` in that entry. For example, to only include an extra single column ``Q`` for the second catalogue, ``extra_col_name_lists=[None, ['Q']]``. extra_col_num_lists : list of list or array of integer, or None, optional Should be a list of two lists of strings, analagous to ``column_num_lists``, providing the column indices for additional catalogue columns in the original .csv files to be included in the output datafiles. Like ``extra_col_name_lists``, for either catalogue ``None`` can be entered for no additional columns; for the above example we would use ``extra_col_num_lists=[None, [7]]``. file_extension : string, optional Additional string to insert into loaded cross-match-specific files (such as ``ac.npy``) and into saved files. Defaults to empty string, but should be given for cases of "with and without photometry" match runs, where a single cross-match run is used to create two separate match tables, and hence two separate output sets of .csv files. ''' # Need IDs/coordinates x2, mags (xN), then our columns: match probability, average # contaminant flux, eta/xi, and then M contaminant fractions for M relative fluxes. # TODO: un-hardcode number of relative contaminant fractions. pylint: disable=fixme # TODO: remove photometric likelihood when not used. pylint: disable=fixme our_columns = ['MATCH_P', 'SEPARATION', 'ETA', 'XI', f'{extra_col_cat_names[0]}_AVG_CONT', f'{extra_col_cat_names[1]}_AVG_CONT', f'{extra_col_cat_names[0]}_CONT_F1', f'{extra_col_cat_names[0]}_CONT_F10', f'{extra_col_cat_names[1]}_CONT_F1', f'{extra_col_cat_names[1]}_CONT_F10'] cols = np.append(np.append(column_name_lists[0], column_name_lists[1]), our_columns) for i, entry in zip([0, 1], ['1st', '2nd']): if ((extra_col_name_lists[i] is None and extra_col_num_lists[i] is not None) or (extra_col_name_lists[i] is not None and extra_col_num_lists[i] is None)): raise UserWarning("extra_col_name_lists and extra_col_num_lists either both " f"need to be None, or both need to not be None, for the {entry} " "catalogue.") if extra_col_num_lists[i] is not None: cols = np.append(cols, extra_col_name_lists[i]) ac = getattr(cm, f'ac{file_extension}') bc = getattr(cm, f'bc{file_extension}') p = getattr(cm, f'pc{file_extension}') eta = getattr(cm, f'eta{file_extension}') xi = getattr(cm, f'xi{file_extension}') a_avg_cont = getattr(cm, f'acontamflux{file_extension}') b_avg_cont = getattr(cm, f'bcontamflux{file_extension}') acontprob = getattr(cm, f'pacontam{file_extension}') bcontprob = getattr(cm, f'pbcontam{file_extension}') seps = getattr(cm, f'crptseps{file_extension}') if correct_astro_flags[0]: cols = np.append(cols, [f'{extra_col_cat_names[0]}_FIT_SIG']) a_concatastro = cm.a_astro if correct_astro_flags[1]: cols = np.append(cols, [f'{extra_col_cat_names[1]}_FIT_SIG']) b_concatastro = cm.b_astro n_amags, n_bmags = len(column_name_lists[0]) - 3, len(column_name_lists[1]) - 3 if extra_col_num_lists[0] is None: a_cols = column_num_lists[0] a_names = column_name_lists[0] else: a_cols = np.append(column_num_lists[0], extra_col_num_lists[0]).astype(int) a_names = np.append(column_name_lists[0], extra_col_name_lists[0]) if extra_col_num_lists[1] is None: b_cols = column_num_lists[1] b_names = column_name_lists[1] else: b_cols = np.append(column_num_lists[1], extra_col_num_lists[1]).astype(int) b_names = np.append(column_name_lists[1], extra_col_name_lists[1]) a_names, b_names = np.array(a_names)[np.argsort(a_cols)], np.array(b_names)[np.argsort(b_cols)] cat_a = pd.read_csv(input_csv_file_paths[0], memory_map=True, header=None if not headers[0] else 0, usecols=a_cols, names=a_names) cat_b = pd.read_csv(input_csv_file_paths[1], memory_map=True, header=None if not headers[1] else 0, usecols=b_cols, names=b_names) n_matches = len(ac) match_df = pd.DataFrame(columns=cols, index=np.arange(0, n_matches)) for i in column_name_lists[0]: match_df.loc[:, i] = cat_a.loc[ac, i].values for i in column_name_lists[1]: match_df.loc[:, i] = cat_b.loc[bc, i].values match_df.iloc[:, 6+n_amags+n_bmags] = p match_df.iloc[:, 6+n_amags+n_bmags+1] = seps match_df.iloc[:, 6+n_amags+n_bmags+2] = eta match_df.iloc[:, 6+n_amags+n_bmags+3] = xi match_df.iloc[:, 6+n_amags+n_bmags+4] = a_avg_cont match_df.iloc[:, 6+n_amags+n_bmags+5] = b_avg_cont for i in range(acontprob.shape[0]): match_df.iloc[:, 6+n_amags+n_bmags+6+i] = acontprob[i, :] for i in range(bcontprob.shape[0]): match_df.iloc[:, 6+n_amags+n_bmags+6+acontprob.shape[0]+i] = bcontprob[i, :] if extra_col_name_lists[0] is not None: for i in extra_col_name_lists[0]: match_df.loc[:, i] = cat_a.loc[ac, i].values if extra_col_name_lists[1] is not None: for i in extra_col_name_lists[1]: match_df.loc[:, i] = cat_b.loc[bc, i].values # FIT_SIG are the last 0-2 columns, after [ID+coords(x2)+mag(xN)]x2 + # Q match-made columns, plus len(extra_col_name_lists)x2. if correct_astro_flags[0]: ind = (len(column_name_lists[0]) + len(column_name_lists[1]) + len(our_columns) + (len(extra_col_name_lists[0]) if extra_col_name_lists[0] is not None else 0) + (len(extra_col_name_lists[1]) if extra_col_name_lists[1] is not None else 0)) match_df.iloc[:, ind] = a_concatastro[ac, 2] if correct_astro_flags[1]: # Here we also need to check if catalogue "a" has processed uncertainties too. _dx = 1 if correct_astro_flags[0] else 0 ind = (len(column_name_lists[0]) + len(column_name_lists[1]) + len(our_columns) + (len(extra_col_name_lists[0]) if extra_col_name_lists[0] is not None else 0) + (len(extra_col_name_lists[1]) if extra_col_name_lists[1] is not None else 0)) + _dx match_df.iloc[:, ind] = b_concatastro[bc, 2] if file_extension == '': _output_filename = output_filenames[0] else: # Insert the file_extension keyword into the middle of # /path/to/file/foo.bar. f, f_ext = os.path.splitext(output_filenames[0]) _output_filename = f + file_extension + f_ext match_df.to_csv(f'{output_folder}/{_output_filename}', encoding='utf-8', index=False, header=False) # For non-match, ID/coordinates/mags, then island probability + average # contamination. af = getattr(cm, f'af{file_extension}') a_avg_cont = getattr(cm, f'afieldflux{file_extension}') p = getattr(cm, f'pfa{file_extension}') seps = getattr(cm, f'afieldseps{file_extension}') afeta = getattr(cm, f'afieldeta{file_extension}') afxi = getattr(cm, f'afieldxi{file_extension}') our_columns = ['MATCH_P', 'NNM_SEPARATION', 'NNM_ETA', 'NNM_XI', f'{extra_col_cat_names[0]}_AVG_CONT'] cols = np.append(column_name_lists[0], our_columns) if extra_col_num_lists[0] is not None: cols = np.append(cols, extra_col_name_lists[0]) if correct_astro_flags[0]: cols = np.append(cols, [f'{extra_col_cat_names[0]}_FIT_SIG']) a_concatastro = cm.a_astro n_anonmatches = len(af) a_nonmatch_df = pd.DataFrame(columns=cols, index=np.arange(0, n_anonmatches)) for i in column_name_lists[0]: a_nonmatch_df.loc[:, i] = cat_a.loc[af, i].values a_nonmatch_df.iloc[:, 3+n_amags] = p a_nonmatch_df.iloc[:, 3+n_amags+1] = seps a_nonmatch_df.iloc[:, 3+n_amags+2] = afeta a_nonmatch_df.iloc[:, 3+n_amags+3] = afxi a_nonmatch_df.iloc[:, 3+n_amags+4] = a_avg_cont if extra_col_name_lists[0] is not None: for i in extra_col_name_lists[0]: a_nonmatch_df.loc[:, i] = cat_a.loc[af, i].values if correct_astro_flags[0]: ind = (len(column_name_lists[0]) + len(our_columns) + (len(extra_col_name_lists[0]) if extra_col_name_lists[0] is not None else 0)) a_nonmatch_df.iloc[:, ind] = a_concatastro[af, 2] if file_extension == '': _output_filename = output_filenames[1] else: # Insert the file_extension keyword into the middle of # /path/to/file/foo.bar. f, f_ext = os.path.splitext(output_filenames[1]) _output_filename = f + file_extension + f_ext a_nonmatch_df.to_csv(f'{output_folder}/{_output_filename}', encoding='utf-8', index=False, header=False) bf = getattr(cm, f'bf{file_extension}') b_avg_cont = getattr(cm, f'bfieldflux{file_extension}') p = getattr(cm, f'pfb{file_extension}') seps = getattr(cm, f'bfieldseps{file_extension}') bfeta = getattr(cm, f'bfieldeta{file_extension}') bfxi = getattr(cm, f'bfieldxi{file_extension}') our_columns = ['MATCH_P', 'NNM_SEPARATION', 'NNM_ETA', 'NNM_XI', f'{extra_col_cat_names[1]}_AVG_CONT'] cols = np.append(column_name_lists[1], our_columns) if extra_col_num_lists[1] is not None: cols = np.append(cols, extra_col_name_lists[1]) if correct_astro_flags[1]: cols = np.append(cols, [f'{extra_col_cat_names[1]}_FIT_SIG']) b_concatastro = cm.b_astro n_bnonmatches = len(bf) b_nonmatch_df = pd.DataFrame(columns=cols, index=np.arange(0, n_bnonmatches)) for i in column_name_lists[1]: b_nonmatch_df.loc[:, i] = cat_b.loc[bf, i].values b_nonmatch_df.iloc[:, 3+n_bmags] = p b_nonmatch_df.iloc[:, 3+n_bmags+1] = seps b_nonmatch_df.iloc[:, 3+n_bmags+2] = bfeta b_nonmatch_df.iloc[:, 3+n_bmags+3] = bfxi b_nonmatch_df.iloc[:, 3+n_bmags+4] = b_avg_cont if extra_col_name_lists[1] is not None: for i in extra_col_name_lists[1]: b_nonmatch_df.loc[:, i] = cat_b.loc[bf, i].values if correct_astro_flags[1]: ind = (len(column_name_lists[1]) + len(our_columns) + (len(extra_col_name_lists[1]) if extra_col_name_lists[1] is not None else 0)) b_nonmatch_df.iloc[:, ind] = b_concatastro[bf, 2] if file_extension == '': _output_filename = output_filenames[2] else: # Insert the file_extension keyword into the middle of # /path/to/file/foo.bar. f, f_ext = os.path.splitext(output_filenames[2]) _output_filename = f + file_extension + f_ext b_nonmatch_df.to_csv(f'{output_folder}/{_output_filename}', encoding='utf-8', index=False, header=False)
[docs] def rect_slice_csv(input_folder, output_folder, input_filename, output_filename, rect_coords, padding, astro_cols, mem_chunk_num, header=False): ''' Convenience function to take a small rectangular slice of a larger .csv catalogue, based on its given orthogonal sky coordinates in the large catalogue. Parameters ---------- input_folder : string Folder in which the larger .csv catalogue is stored. output_folder : string Folder into which to save the cutout catalogue. input_filename : string Name, including extension, of the larger catalogue file. output_filename : string Name, including extension, of the cutout catalogue. rect_coords : list or array of floats List of coordinates inside which to take the subset catalogue. Should be of the kind [lower_ax1, upper_ax1, lower_ax2, upper_ax2], where ax1 is e.g. Right Ascension and ax2 is e.g. Declination. padding : float Amount of additional on-sky area to permit outside of ``rect_coords``. In these cases sources must be within ``padding`` distance of the rectangle defined by ``rect_coords`` by the Haversine formula. astro_cols : list or array of integers List of zero-index columns representing the orthogonal sky axes, in the sense of [ax1, ax2], with ax1 being e.g. Galactic Longitude and ax2 being e.g. Galactic Latitude, as with ``rect_coords``. mem_chunk_num : integer Integer representing the number of sub-slices of the catalogue to load, in cases where the larger file is larger than available memory. header : boolean, optional Flag indicating whether .csv file has a header line, giving names of each column, or if the first line of the file is the first line of the dataset. ''' with open(f'{input_folder}/{input_filename}', encoding='utf-8') as fp: n_rows = 0 if not header else -1 for _ in fp: n_rows += 1 small_astro = np.empty((n_rows, 2), float) n = 0 for chunk in pd.read_csv(f'{input_folder}/{input_filename}', chunksize=100000, usecols=astro_cols, header=None if not header else 0): small_astro[n:n+chunk.shape[0]] = chunk.values n += chunk.shape[0] sky_cut = _load_rectangular_slice(small_astro, rect_coords[0], rect_coords[1], rect_coords[2], rect_coords[3], padding) n_inside_rows = 0 for cnum in range(0, mem_chunk_num): lowind = np.floor(n_rows*cnum/mem_chunk_num).astype(int) highind = np.floor(n_rows*(cnum+1)/mem_chunk_num).astype(int) n_inside_rows += np.sum(sky_cut[lowind:highind]) df_orig = pd.read_csv(f'{input_folder}/{input_filename}', nrows=1, header=None if not header else 0) df = pd.DataFrame(columns=df_orig.columns, index=np.arange(0, n_inside_rows)) counter = 0 outer_counter = 0 chunksize = 100000 for chunk in pd.read_csv(f'{input_folder}/{input_filename}', chunksize=chunksize, header=None if not header else 0): inside_n = np.sum(sky_cut[outer_counter:outer_counter+chunksize]) df.iloc[counter:counter+inside_n] = chunk.values[ sky_cut[outer_counter:outer_counter+chunksize]] counter += inside_n outer_counter += chunksize df.to_csv(f'{output_folder}/{output_filename}', encoding='utf-8', index=False, header=False)
[docs] def rect_slice_npy(input_folder, output_folder, rect_coords, padding, mem_chunk_num): ''' Convenience function to take a small rectangular slice of a larger catalogue, represented by three or four binary .npy files, based on its given orthogonal sky coordinates in the large catalogue. Parameters ---------- input_folder : string Folder in which the larger .npy files representing the catalogue are stored. output_folder : string Folder into which to save the cutout catalogue .npy files. rect_coords : list or array of floats List of coordinates inside which to take the subset catalogue. Should be of the kind [lower_ax1, upper_ax1, lower_ax2, upper_ax2], where ax1 is e.g. Right Ascension and ax2 is e.g. Declination. padding : float Amount of additional on-sky area to permit outside of ``rect_coords``. In these cases sources must be within ``padding`` distance of the rectangle defined by ``rect_coords`` by the Haversine formula. astro_cols : list or array of integers List of zero-index columns representing the orthogonal sky axes, in the sense of [ax1, ax2], with ax1 being e.g. Galactic Longitude and ax2 being e.g. Galactic Latitude, as with ``rect_coords``. mem_chunk_num : integer Integer representing the number of sub-slices of the catalogue to load, in cases where the larger file is larger than available memory. ''' astro = np.load(f'{input_folder}/con_cat_astro.npy', mmap_mode='r') photo = np.load(f'{input_folder}/con_cat_photo.npy', mmap_mode='r') best_index = np.load(f'{input_folder}/magref.npy', mmap_mode='r') n_rows = len(astro) sky_cut = _load_rectangular_slice(astro, rect_coords[0], rect_coords[1], rect_coords[2], rect_coords[3], padding) n_inside_rows = 0 for cnum in range(0, mem_chunk_num): lowind = np.floor(n_rows*cnum/mem_chunk_num).astype(int) highind = np.floor(n_rows*(cnum+1)/mem_chunk_num).astype(int) n_inside_rows += int(np.sum(sky_cut[lowind:highind])) small_astro = open_memmap(f'{output_folder}/con_cat_astro.npy', mode='w+', dtype=float, shape=(n_inside_rows, 3)) small_photo = open_memmap(f'{output_folder}/con_cat_photo.npy', mode='w+', dtype=float, shape=(n_inside_rows, photo.shape[1])) small_best_index = open_memmap(f'{output_folder}/magref.npy', mode='w+', dtype=int, shape=(n_inside_rows,)) small_chunk_overlap = open_memmap(f'{output_folder}/in_chunk_overlap.npy', mode='w+', dtype=bool, shape=(n_inside_rows,)) counter = 0 for cnum in range(0, mem_chunk_num): lowind = np.floor(n_rows*cnum/mem_chunk_num).astype(int) highind = np.floor(n_rows*(cnum+1)/mem_chunk_num).astype(int) inside_n = np.sum(sky_cut[lowind:highind]) small_astro[counter:counter+inside_n] = astro[lowind:highind][ sky_cut[lowind:highind]] small_photo[counter:counter+inside_n] = photo[lowind:highind][ sky_cut[lowind:highind]] small_best_index[counter:counter+inside_n] = best_index[lowind:highind][ sky_cut[lowind:highind]] # Always assume that a cutout is a single "visit" with no chunk "halo". small_chunk_overlap[counter:counter:inside_n] = False counter += inside_n
def apply_proper_motion(lon, lat, pm_lon, pm_lat, ref_epoch, move_to_epoch, coord_system): ''' Functionality to apply proper motions to a dataset with measured on-sky stellar drift. Parameters ---------- lon : numpy.ndarray or list of floats Longitude coordinate of each object. Should either be Right Ascension or galactic longitude, depending on ``coord_system``. lat: numpy.ndarray or list of floats Declination or galactic latitude (depending on ``coord_system``) of each object to be moved to a new epoch. pm_lon : numpy.ndarray or list of floats Longitudinal drift of each object, corresponding element by element to ``lon`` and ``lat``. Must be in units of arcseconds per year, and account for the latitudinal projection effect (e.g., the "cos dec" effect) already. pm_lat : numpy.ndarray or list of floats The latitudinal drift of the objects, in arcseconds per year. ref_epoch : numpy.ndarray of strings or string The date, or dates, of all observations. Can either be a single value or an array/list of epochs, element-wise with ``lon`` et al. Must be accepted by ``SkyCoord`` as a valid format, such as ``JXXXX.YYY` or ``YYYY-MM-DD``. move_to_epoch : string The new date to which all sources should have their positions moved to. Must be a valid astropy ``Time`` format, expecting either ``JXXXX.YYY`` or ``YYYY-MM-DD`. coord_system : string String to determine which coordinate system the data are in, either ``equatorial``, in which case the ICRS frame is used internally, or ``galactic``, where the galactic coordinate system will be applied. Returns ------- new_lon : numpy.ndarray of floats The new longitudinal coordinates of each object in ``move_to_epoch`` dates. new_lat : numpy.ndarray of floats Latitude (Dec or b) of objects at the new epoch. ''' lon, lat = np.array(lon), np.array(lat) pm_lon, pm_lat = np.array(pm_lon), np.array(pm_lat) q = ~np.isnan(pm_lon) & ~np.isnan(pm_lat) if not isinstance(ref_epoch, str): ref_epoch = np.array(ref_epoch)[q] time_ref_epoch = [Time(x) for x in ref_epoch] else: time_ref_epoch = Time(ref_epoch) # pylint: disable=no-member if coord_system == 'galactic': c = SkyCoord(l=lon[q] * u.degree, b=lat[q] * u.degree, frame='galactic', obstime=time_ref_epoch, pm_l_cosb=pm_lon[q] * u.arcsecond / u.year, pm_b=pm_lat[q] * u.arcsecond / u.year) else: c = SkyCoord(ra=lon[q] * u.degree, dec=lat[q] * u.degree, frame='icrs', obstime=time_ref_epoch, pm_ra_cosdec=pm_lon[q] * u.arcsecond / u.year, pm_dec=pm_lat[q] * u.arcsecond / u.year) # pylint: enable=no-member d = c.apply_space_motion(new_obstime=Time(move_to_epoch)) new_lon, new_lat = np.empty(len(lon), float), np.empty(len(lat), float) if coord_system == 'galactic': new_lon[q], new_lat[q] = d.l.degree, d.b.degree else: new_lon[q], new_lat[q] = d.ra.degree, d.dec.degree new_lon[~q], new_lat[~q] = lon[~q], lat[~q] return new_lon, new_lat