Module deeplenstronomy.utils

Helper functions and classes utilized internally.

Expand source code
"""Helper functions and classes utilized internally."""

import os
import sys
import yaml

from astropy.io import fits
import numpy as np
import pandas as pd
from scipy.interpolate import LinearNDInterpolator, interp1d

def dict_select(input_dict, keys):
    """
    Trim a dictionary down to selected keys. Requires presence of keys
    in input_dict.
    
    Args:
        input_dict (dict): the dictionary to trim
        keys (List): list of keys desired in the final dict

    Returns:
        trimmed dictionary
    """
    return {k: input_dict[k] for k in keys}

def dict_select_choose(input_dict, keys):
    """
    Trim a dictionary down to selected keys, if they are in the dictionary.
    
    Args:
        input_dict (dict): the dictionary to trim
        keys (List): list of keys desired in the final dict

    Returns:
        trimmed dictionary
    """
    return {k: input_dict[k] for k in keys if k in input_dict.keys()}

def select_params(input_dict, profile_prefix):
    """
    Get just the parameters and values for a given profile prefix.

    Args:
        input_dict (dict): the dictionary to search
        profile_prefix (str): i.e. "PLANE_1-OBJECT_2-LIGHT_PROFILE_1-"

    Returns:
        parameter dictionary for profile
    """
    params = [k for k in input_dict.keys() if k[0:len(profile_prefix)] == profile_prefix]
    return {x.split('-')[-1]: input_dict[x] for x in params if x[-4:] != 'NAME'}


class KeyPathDict(dict):
    """
    A Subclass of <dict> to enable keypath functionality. Original code is from the 
    python-benedict module https://github.com/fabiocaccamo/python-benedict 
    [Copyright (c) 2019 Fabio Caccamo, under the MIT license].
    """
    def __init__(self, base_dict, keypath_separator='.'):
        """
        Initialize a KeyPathDict by supplying the underlying dict to which 
        adding keypath functionality is desired.

        Args:
            base_dict (dict): the dictionary to add keypaths to
            keypath_separator (str, optional, default='.'):, the character to use to separate keys
        """
        # Inherit attributes of the base dict
        super().__init__(base_dict)

        # Set the keypath sepatator and find all nested keys
        self.keypath_separator = keypath_separator
        self.kls = self._keylists(base_dict)

        return

    def _get_keylist(self, item, parent_keys):
        """
        Recursively search for all nested dictionary keys.

        :param item: parent dictionary or value in a dictionary
        :param parent_keys: the keys of the dictionary one level up
        :return: keylist: list, list of all keys on a single level in the dictionary
        """
        keylist = []
        for key, value in item.items():
            # Collect the keys of the dictionary
            keys = parent_keys + [key]
            keylist += [keys]
            # If the value is a dict, recursively search that dict
            if isinstance(value, dict):
                keylist += self._get_keylist(value, keys)
        return keylist

    def _keylists(self, d):
        """
        Shell function to call the recursive key search

        :param d: dict, the dictionary to search
        :return: keylist: list, nested list of all keys in the dictionary
        """
        return self._get_keylist(d, [])
    
    def keypaths(self):
        """
        Join the keylists using the keypath_separator.

        Returns: 
            list of all keypaths in the dictionary as strings
        """
        kps = [self.keypath_separator.join(['{}'.format(key) for key in kl]) for kl in self.kls]
        kps.sort()
        return kps

def read_distribution_file(filename):
    """
    Load the file information into a pandas dataframe
    
    Args:
        filename (str): the file containing the distribution    
    
    Returns:
        pandas.DataFrame containing the tabular distribtution 

    Raises:
        AssertionError: if "WEIGHT" is not one of the column names
    """
    df = pd.read_csv(filename, delim_whitespace=True)

    assert 'WEIGHT' in df.columns, "'WEIGHT' must be a column in {}".format(filename)

    return df
        

def draw_from_user_dist(filename, size, mode, step=10):
    """
    Interpolate a user-specified N-dimensional probability distribution and
    sample from it.

    Args:
        filename (str): the file containing the distribution 
        size (int):  the number of times to sample the probability distribution 
        mode (str): choose from ['interpolate', 'sample'] 
        step (int): the number of steps on the interpolation grid  
        
    Returns:
        parameters: list, the names of the paramters
        choices: array with entries as arrays of drawn parameters 

    Raises:
        NotImplementedError: if a mode other than "sample" or "interpolate" is passed
    """

    df = read_distribution_file(filename)

    parameters = [x for x in df.columns if x != 'WEIGHT']
    points = df[parameters].values
    weights = df['WEIGHT'].values

    if mode == 'interpolate':
        # 2+ Dimension case
        if len(parameters) > 1:
            # Interpolate the distribution and evaluate it on a grid of all possible parameter combinations
            interpolator = LinearNDInterpolator(points, weights, fill_value=0.0)
            grid_vectors = [np.linspace(df[x].values.min(), df[x].values.max(), step) for x in parameters]
            param_grids = np.array(np.meshgrid(*grid_vectors)).T.reshape(step**len(parameters), len(parameters))
            weighted_params = interpolator(param_grids)
    
            # Draw from the grid based on its weight
            draws = np.random.choice(np.arange(len(param_grids)), size=size, p=weighted_params/weighted_params.sum())
            choices = param_grids[draws]

        elif len(parameters) == 1:
            # Interpolate the 1D grid
            grid = np.linspace(df[parameters].values.min(), df[parameters].values.max(), step)
            interpolator = interp1d(points.flatten(), weights, fill_value=0.0)
            weighted_params = interpolator(grid)
            
            # Draw from the grid based on its weight
            choices = np.random.choice(grid, size=size, p=weighted_params/weighted_params.sum())

    elif mode == 'sample':
        index_arr = np.random.choice(np.arange(len(points), dtype=int), size=size, p=weights / weights.sum())
        choices = points[index_arr]

    else:
        raise NotImplementedError("unexpected mode passed, must be 'sample' or 'interpolate'")
            
    return parameters, choices

def read_images(im_dir, im_size, bands):
    """
    Read images into memory and resize to match simulations.

    Args:
        im_dir (str): path to directory of images 
        im_size (int): numPix along onle side of an image 
        bands (List[str]): list of bands used in simulation 

    Returns:
        array of processed images
    """
    # Load images into an array
    im_array = []
    for band in bands:
        if not os.path.exists(im_dir + '/' + band + '.fits'):
            print(im_dir + " is missing " + band + ".fits")
            sys.exit()
            
        hdu = fits.open(im_dir + '/' + band + '.fits')
        im_array.append(hdu[0].data)
        hdu.close()

    im_array = np.swapaxes(np.array(im_array), 0, 1)
    
    # Resize the images to match the simulations
    if im_array.shape[-1] < im_size:
        # pad with zeros
        pad_width = ((0,0), (0,0), (0,0), (im_size // 4, im_size // 4 + 1))
        im_array = np.pad(im_array, pad_width, mode='constant', constant_values=0.0)
        
    if im_array.shape[-2] < im_size:
        # pad with zeros
        pad_width = ((0,0), (0,0), (im_size // 4, im_size // 4 + 1), (0,0))
        im_array = np.pad(im_array, pad_width, mode='constant', constant_values=0.0)
        
    if im_array.shape[-1] > im_size:
        # Crop on axis=-1
        crop_amount = im_array.shape[-1] - im_size
        im_array = im_array[:, :, :, crop_amount // 2 : - crop_amount // 2]
        
    if im_array.shape[-2] > im_size:
        # Crop on axis=-2
        crop_amount = im_array.shape[-2] - im_size
        im_array = im_array[:, :, crop_amount // 2 : - crop_amount // 2, :]

    return im_array

def organize_image_backgrounds(im_dir, image_bank_size, config_dicts, configuration):
    """
    Sort image files based on map. If no map exists, sort randomly.

    Args:
        im_dir (str): path to directory of images
        image_bank_size (int): number of images in user-specified bank
        config_dicts (List[dict]): list of config_dicts    
        configuration (str): the configuration currently running
    
    Returns:
        the indices of the images utilized for each config_dict 
    """
    map_columns = []
    if os.path.exists(im_dir + '/' + 'map.txt'):
        # Read the map
        df = pd.read_csv(im_dir + '/' + 'map.txt', delim_whitespace=True)

        # Trim to just the columns in the config dict
        map_columns, bad_columns = [], []
        for x in df.columns:
            if x.startswith('CONFIGURATION'):
                split_x = x.split('-')
                if split_x[0] != configuration:
                    continue
                name = '-'.join(split_x[1:])

                if name in config_dicts[0].keys():
                    map_columns.append(x)
                else:
                    bad_columns.append(name)
            else:
                # doesn't start with configuration
                if x in config_dicts[0].keys():
                    map_columns.append(x)
                else:
                    bad_columns.append(x)


        if len(bad_columns) != 0:
            print(config_dicts[0].keys())
            print("WARNING {0} are not found in the simulated dataset for {1}".format(', '.join(bad_columns), configuration) +
                  ". You may see unexpected results. Use the dataset.search(<param_name>) function to find the correct column names.")
        
    if len(map_columns) == 0:
        # Sort randomly
        image_indices = np.random.choice(np.arange(image_bank_size), replace=True, size=len(config_dicts))
    
    else:
        # Trim df to just the columns needed
        map_param_array = df[map_columns].values[:, np.newaxis]
        
        # for each entry in config_dict, set up numpy broadcasting
        im_param_array = []
        for config_dict in config_dicts:
            im_param_array.append([config_dict[x] if not x.startswith('CONFIGURATION') else config_dict['-'.join(x.split('-')[1:])] for x in map_columns])
        im_param_array = np.array(im_param_array)

        # divide by stds to put parameters on same footing
        im_stds = np.std(im_param_array, axis=0)[np.newaxis, :]
        im_stds = np.where(im_stds < 1.0, np.ones(len(im_stds)), im_stds)

        # Find the closest image to each parameter combination in map_param_array
        image_indices = np.argmin(np.sum(np.abs(im_param_array - map_param_array) / im_stds, axis=2), axis=0)

    return image_indices
    

def read_cadence_file(filename):
    """
    Parse a cadence file.

    Args:
        filename (str): Name of cadence file

    Returns:
        cadence_dict: dictionary containing cadence file contents
    """
    with open(filename, 'r') as f:
        cadence_dict = yaml.safe_load(f)        
        
        # Set reference mjd to the default value of 0
        if 'REFERENCE_MJD' not in cadence_dict:
            cadence_dict['REFERENCE_MJD'] = 0
                        
    return cadence_dict

Functions

def dict_select(input_dict, keys)

Trim a dictionary down to selected keys. Requires presence of keys in input_dict.

Args

input_dict : dict
the dictionary to trim
keys : List
list of keys desired in the final dict

Returns

trimmed dictionary

Expand source code
def dict_select(input_dict, keys):
    """
    Trim a dictionary down to selected keys. Requires presence of keys
    in input_dict.
    
    Args:
        input_dict (dict): the dictionary to trim
        keys (List): list of keys desired in the final dict

    Returns:
        trimmed dictionary
    """
    return {k: input_dict[k] for k in keys}
def dict_select_choose(input_dict, keys)

Trim a dictionary down to selected keys, if they are in the dictionary.

Args

input_dict : dict
the dictionary to trim
keys : List
list of keys desired in the final dict

Returns

trimmed dictionary

Expand source code
def dict_select_choose(input_dict, keys):
    """
    Trim a dictionary down to selected keys, if they are in the dictionary.
    
    Args:
        input_dict (dict): the dictionary to trim
        keys (List): list of keys desired in the final dict

    Returns:
        trimmed dictionary
    """
    return {k: input_dict[k] for k in keys if k in input_dict.keys()}
def draw_from_user_dist(filename, size, mode, step=10)

Interpolate a user-specified N-dimensional probability distribution and sample from it.

Args

filename : str
the file containing the distribution
size : int
the number of times to sample the probability distribution
mode : str
choose from ['interpolate', 'sample']
step : int
the number of steps on the interpolation grid

Returns

parameters
list, the names of the paramters
choices
array with entries as arrays of drawn parameters

Raises

NotImplementedError
if a mode other than "sample" or "interpolate" is passed
Expand source code
def draw_from_user_dist(filename, size, mode, step=10):
    """
    Interpolate a user-specified N-dimensional probability distribution and
    sample from it.

    Args:
        filename (str): the file containing the distribution 
        size (int):  the number of times to sample the probability distribution 
        mode (str): choose from ['interpolate', 'sample'] 
        step (int): the number of steps on the interpolation grid  
        
    Returns:
        parameters: list, the names of the paramters
        choices: array with entries as arrays of drawn parameters 

    Raises:
        NotImplementedError: if a mode other than "sample" or "interpolate" is passed
    """

    df = read_distribution_file(filename)

    parameters = [x for x in df.columns if x != 'WEIGHT']
    points = df[parameters].values
    weights = df['WEIGHT'].values

    if mode == 'interpolate':
        # 2+ Dimension case
        if len(parameters) > 1:
            # Interpolate the distribution and evaluate it on a grid of all possible parameter combinations
            interpolator = LinearNDInterpolator(points, weights, fill_value=0.0)
            grid_vectors = [np.linspace(df[x].values.min(), df[x].values.max(), step) for x in parameters]
            param_grids = np.array(np.meshgrid(*grid_vectors)).T.reshape(step**len(parameters), len(parameters))
            weighted_params = interpolator(param_grids)
    
            # Draw from the grid based on its weight
            draws = np.random.choice(np.arange(len(param_grids)), size=size, p=weighted_params/weighted_params.sum())
            choices = param_grids[draws]

        elif len(parameters) == 1:
            # Interpolate the 1D grid
            grid = np.linspace(df[parameters].values.min(), df[parameters].values.max(), step)
            interpolator = interp1d(points.flatten(), weights, fill_value=0.0)
            weighted_params = interpolator(grid)
            
            # Draw from the grid based on its weight
            choices = np.random.choice(grid, size=size, p=weighted_params/weighted_params.sum())

    elif mode == 'sample':
        index_arr = np.random.choice(np.arange(len(points), dtype=int), size=size, p=weights / weights.sum())
        choices = points[index_arr]

    else:
        raise NotImplementedError("unexpected mode passed, must be 'sample' or 'interpolate'")
            
    return parameters, choices
def organize_image_backgrounds(im_dir, image_bank_size, config_dicts, configuration)

Sort image files based on map. If no map exists, sort randomly.

Args

im_dir : str
path to directory of images
image_bank_size : int
number of images in user-specified bank
config_dicts : List[dict]
list of config_dicts
configuration : str
the configuration currently running

Returns

the indices of the images utilized for each config_dict

Expand source code
def organize_image_backgrounds(im_dir, image_bank_size, config_dicts, configuration):
    """
    Sort image files based on map. If no map exists, sort randomly.

    Args:
        im_dir (str): path to directory of images
        image_bank_size (int): number of images in user-specified bank
        config_dicts (List[dict]): list of config_dicts    
        configuration (str): the configuration currently running
    
    Returns:
        the indices of the images utilized for each config_dict 
    """
    map_columns = []
    if os.path.exists(im_dir + '/' + 'map.txt'):
        # Read the map
        df = pd.read_csv(im_dir + '/' + 'map.txt', delim_whitespace=True)

        # Trim to just the columns in the config dict
        map_columns, bad_columns = [], []
        for x in df.columns:
            if x.startswith('CONFIGURATION'):
                split_x = x.split('-')
                if split_x[0] != configuration:
                    continue
                name = '-'.join(split_x[1:])

                if name in config_dicts[0].keys():
                    map_columns.append(x)
                else:
                    bad_columns.append(name)
            else:
                # doesn't start with configuration
                if x in config_dicts[0].keys():
                    map_columns.append(x)
                else:
                    bad_columns.append(x)


        if len(bad_columns) != 0:
            print(config_dicts[0].keys())
            print("WARNING {0} are not found in the simulated dataset for {1}".format(', '.join(bad_columns), configuration) +
                  ". You may see unexpected results. Use the dataset.search(<param_name>) function to find the correct column names.")
        
    if len(map_columns) == 0:
        # Sort randomly
        image_indices = np.random.choice(np.arange(image_bank_size), replace=True, size=len(config_dicts))
    
    else:
        # Trim df to just the columns needed
        map_param_array = df[map_columns].values[:, np.newaxis]
        
        # for each entry in config_dict, set up numpy broadcasting
        im_param_array = []
        for config_dict in config_dicts:
            im_param_array.append([config_dict[x] if not x.startswith('CONFIGURATION') else config_dict['-'.join(x.split('-')[1:])] for x in map_columns])
        im_param_array = np.array(im_param_array)

        # divide by stds to put parameters on same footing
        im_stds = np.std(im_param_array, axis=0)[np.newaxis, :]
        im_stds = np.where(im_stds < 1.0, np.ones(len(im_stds)), im_stds)

        # Find the closest image to each parameter combination in map_param_array
        image_indices = np.argmin(np.sum(np.abs(im_param_array - map_param_array) / im_stds, axis=2), axis=0)

    return image_indices
def read_cadence_file(filename)

Parse a cadence file.

Args

filename : str
Name of cadence file

Returns

cadence_dict
dictionary containing cadence file contents
Expand source code
def read_cadence_file(filename):
    """
    Parse a cadence file.

    Args:
        filename (str): Name of cadence file

    Returns:
        cadence_dict: dictionary containing cadence file contents
    """
    with open(filename, 'r') as f:
        cadence_dict = yaml.safe_load(f)        
        
        # Set reference mjd to the default value of 0
        if 'REFERENCE_MJD' not in cadence_dict:
            cadence_dict['REFERENCE_MJD'] = 0
                        
    return cadence_dict
def read_distribution_file(filename)

Load the file information into a pandas dataframe

Args

filename : str
the file containing the distribution

Returns

pandas.DataFrame containing the tabular distribtution

Raises

AssertionError
if "WEIGHT" is not one of the column names
Expand source code
def read_distribution_file(filename):
    """
    Load the file information into a pandas dataframe
    
    Args:
        filename (str): the file containing the distribution    
    
    Returns:
        pandas.DataFrame containing the tabular distribtution 

    Raises:
        AssertionError: if "WEIGHT" is not one of the column names
    """
    df = pd.read_csv(filename, delim_whitespace=True)

    assert 'WEIGHT' in df.columns, "'WEIGHT' must be a column in {}".format(filename)

    return df
def read_images(im_dir, im_size, bands)

Read images into memory and resize to match simulations.

Args

im_dir : str
path to directory of images
im_size : int
numPix along onle side of an image
bands : List[str]
list of bands used in simulation

Returns

array of processed images

Expand source code
def read_images(im_dir, im_size, bands):
    """
    Read images into memory and resize to match simulations.

    Args:
        im_dir (str): path to directory of images 
        im_size (int): numPix along onle side of an image 
        bands (List[str]): list of bands used in simulation 

    Returns:
        array of processed images
    """
    # Load images into an array
    im_array = []
    for band in bands:
        if not os.path.exists(im_dir + '/' + band + '.fits'):
            print(im_dir + " is missing " + band + ".fits")
            sys.exit()
            
        hdu = fits.open(im_dir + '/' + band + '.fits')
        im_array.append(hdu[0].data)
        hdu.close()

    im_array = np.swapaxes(np.array(im_array), 0, 1)
    
    # Resize the images to match the simulations
    if im_array.shape[-1] < im_size:
        # pad with zeros
        pad_width = ((0,0), (0,0), (0,0), (im_size // 4, im_size // 4 + 1))
        im_array = np.pad(im_array, pad_width, mode='constant', constant_values=0.0)
        
    if im_array.shape[-2] < im_size:
        # pad with zeros
        pad_width = ((0,0), (0,0), (im_size // 4, im_size // 4 + 1), (0,0))
        im_array = np.pad(im_array, pad_width, mode='constant', constant_values=0.0)
        
    if im_array.shape[-1] > im_size:
        # Crop on axis=-1
        crop_amount = im_array.shape[-1] - im_size
        im_array = im_array[:, :, :, crop_amount // 2 : - crop_amount // 2]
        
    if im_array.shape[-2] > im_size:
        # Crop on axis=-2
        crop_amount = im_array.shape[-2] - im_size
        im_array = im_array[:, :, crop_amount // 2 : - crop_amount // 2, :]

    return im_array
def select_params(input_dict, profile_prefix)

Get just the parameters and values for a given profile prefix.

Args

input_dict : dict
the dictionary to search
profile_prefix : str
i.e. "PLANE_1-OBJECT_2-LIGHT_PROFILE_1-"

Returns

parameter dictionary for profile

Expand source code
def select_params(input_dict, profile_prefix):
    """
    Get just the parameters and values for a given profile prefix.

    Args:
        input_dict (dict): the dictionary to search
        profile_prefix (str): i.e. "PLANE_1-OBJECT_2-LIGHT_PROFILE_1-"

    Returns:
        parameter dictionary for profile
    """
    params = [k for k in input_dict.keys() if k[0:len(profile_prefix)] == profile_prefix]
    return {x.split('-')[-1]: input_dict[x] for x in params if x[-4:] != 'NAME'}

Classes

class KeyPathDict (base_dict, keypath_separator='.')

A Subclass of to enable keypath functionality. Original code is from the python-benedict module https://github.com/fabiocaccamo/python-benedict [Copyright (c) 2019 Fabio Caccamo, under the MIT license].

Initialize a KeyPathDict by supplying the underlying dict to which adding keypath functionality is desired.

Args

base_dict : dict
the dictionary to add keypaths to

keypath_separator (str, optional, default='.'):, the character to use to separate keys

Expand source code
class KeyPathDict(dict):
    """
    A Subclass of <dict> to enable keypath functionality. Original code is from the 
    python-benedict module https://github.com/fabiocaccamo/python-benedict 
    [Copyright (c) 2019 Fabio Caccamo, under the MIT license].
    """
    def __init__(self, base_dict, keypath_separator='.'):
        """
        Initialize a KeyPathDict by supplying the underlying dict to which 
        adding keypath functionality is desired.

        Args:
            base_dict (dict): the dictionary to add keypaths to
            keypath_separator (str, optional, default='.'):, the character to use to separate keys
        """
        # Inherit attributes of the base dict
        super().__init__(base_dict)

        # Set the keypath sepatator and find all nested keys
        self.keypath_separator = keypath_separator
        self.kls = self._keylists(base_dict)

        return

    def _get_keylist(self, item, parent_keys):
        """
        Recursively search for all nested dictionary keys.

        :param item: parent dictionary or value in a dictionary
        :param parent_keys: the keys of the dictionary one level up
        :return: keylist: list, list of all keys on a single level in the dictionary
        """
        keylist = []
        for key, value in item.items():
            # Collect the keys of the dictionary
            keys = parent_keys + [key]
            keylist += [keys]
            # If the value is a dict, recursively search that dict
            if isinstance(value, dict):
                keylist += self._get_keylist(value, keys)
        return keylist

    def _keylists(self, d):
        """
        Shell function to call the recursive key search

        :param d: dict, the dictionary to search
        :return: keylist: list, nested list of all keys in the dictionary
        """
        return self._get_keylist(d, [])
    
    def keypaths(self):
        """
        Join the keylists using the keypath_separator.

        Returns: 
            list of all keypaths in the dictionary as strings
        """
        kps = [self.keypath_separator.join(['{}'.format(key) for key in kl]) for kl in self.kls]
        kps.sort()
        return kps

Ancestors

  • builtins.dict

Methods

def keypaths(self)

Join the keylists using the keypath_separator.

Returns: list of all keypaths in the dictionary as strings

Expand source code
def keypaths(self):
    """
    Join the keylists using the keypath_separator.

    Returns: 
        list of all keypaths in the dictionary as strings
    """
    kps = [self.keypath_separator.join(['{}'.format(key) for key in kl]) for kl in self.kls]
    kps.sort()
    return kps