Source code for sorts.population.population

#!/usr/bin/env python

'''Defines a population of space objects in the form of a class.

'''

#Python standard import
import copy
import pathlib

#Third party import
import h5py
import numpy as np
import pyorb
from tabulate import tabulate
from astropy.time import Time

#Local import
from .. import space_object as so
from .. import plotting
from .. import constants

[docs]class Population: '''Encapsulates a population of space objects as an array and functions for returning instances of space objects. **Default columns:** * 0: oid - Object ID * 1: a - Semi-major axis in m * 2: e - Eccentricity * 3: i - Inclination in degrees * 4: raan - Right Ascension of ascending node in degrees * 5: aop - Argument of perihelion in degrees * 6: mu0 - Mean anoamly in degrees * 7: mjd0 - Epoch of object given in Modified Julian Days Any column that is added will have its name used in initializing the Space object. A population's column data can also be accessed as a python dictionary or a table according to row number, e.g. .. code-block:: python #this returns all Right Ascension of ascending node as a numpy vector vector = my_population['raan'] #this gets row number 3 (since we use 0 indexing) row = my_population[2] but it is also configured to be able to try to convert to uniform type array and perform numpy like slices. If a column data type cannot be converted to the default data type numpy.nan is inserted instead. .. code-block:: python #This will convert the internal data structure to a uniform type array and select all rows and columns 4 and onwards. This is time-consuming on large populations as it actually copies to data. vector = my_population[:,4:] # This is significantly faster as a single column is easy to extract and no conversion is needed data_point = my_population[123,2] This indexing system can also be used for data manipulation: .. code-block:: python my_population['raan'] = vector my_population[2] = row my_population[:,4:] = matrix my_population[123,2] = 2.3 my_population[123,11] = 'test' Notice that in the above example the value to be assigned always has the correct size corresponding to the index and slices, a statement like :code:`x[:,3:7] = 3` is not possible, instead one would write :code:`x[:,3:7] = np.full((len(pop), 4), 3.0, dtype='f')`. #TODO: THESE HAVE CHANGED, UPDATE THEM!!!! :ivar numpy.ndarray objs: Array containing population data. Rows correspond to objects and columns to variables. :ivar str name: Name of population. :ivar list fields: List of strings containing column descriptions. :ivar list space_object_uses: List of booleans describing what columns should be included when initializing a space object. This allows for extra data to be stored in the population without passing it to the space object. :ivar PropagatorBase propagator: Propagator class pointer used for :class:`space_object.SpaceObject`. :ivar dict propagator_options: Propagator initialization keyword arguments. :param str name: Name of population. :param list fields: List of strings containing column descriptions for addition data besides the default columns. :param list dtypes: List of strings containing numpy data type description. Defaults to 'f'. :param list space_object_fields: List of booleans describing what columns should be included when initializing a space object. This allows for extra data to be stored in the population without passing it to the space object. :param PropagatorBase propagator: Propagator class pointer used for :class:`space_object.SpaceObject`. :param dict propagator_options: Propagator initialization keyword arguments. ''' _default_dtype = 'float64' _default_fields = ['oid','a','e','i','raan','aop','mu0','mjd0'] _default_state_fields = ['a','e','i','raan','aop','mu0'] _default_epoch = {'field': 'mjd0', 'format': 'mjd', 'scale': 'utc'}
[docs] def __init__(self, fields = None, dtypes = None, space_object_fields = None, state_fields = None, epoch_field = None, propagator = None, propagator_args = {}, propagator_options = {}, ): if fields is None: fields = copy.copy(Population._default_fields) self.fields = fields self.space_object_fields = space_object_fields if dtypes is None: self.dtypes = [Population._default_dtype]*len(self.fields) else: self.dtypes = dtypes if state_fields is None: state_fields = [] for key in Population._default_state_fields: if key in self.fields: state_fields.append(key) self.state_fields = state_fields if epoch_field is None: epoch_field = copy.deepcopy(Population._default_epoch) else: if isinstance(epoch_field, str): epoch_field_ = epoch_field epoch_field = copy.deepcopy(Population._default_epoch) epoch_field['field'] = epoch_field_ self.epoch_field = epoch_field self.allocate(0) self.propagator = propagator self.propagator_options = propagator_options self.propagator_args = propagator_args
def __len__(self): return(self.data.shape[0]) @property def out_frame(self): if 'settings' not in self.propagator_options: return None if 'out_frame' not in self.propagator_options['settings']: return None return self.propagator_options['settings']['out_frame'] @out_frame.setter def out_frame(self, val): if 'settings' not in self.propagator_options: self.propagator_options['settings'] = {} self.propagator_options['settings']['out_frame'] = val @property def in_frame(self): if 'settings' not in self.propagator_options: return None if 'in_frame' not in self.propagator_options['settings']: return None return self.propagator_options['settings']['in_frame'] @in_frame.setter def in_frame(self, val): if 'settings' not in self.propagator_options: self.propagator_options['settings'] = {} self.propagator_options['settings']['in_frame'] = val
[docs] def copy(self): '''Return a copy of the current Population instance. ''' pop = Population( fields = copy.deepcopy(self.fields), dtypes = copy.deepcopy(self.dtypes), space_object_fields = copy.deepcopy(self.space_object_fields), state_fields = copy.deepcopy(self.state_fields), epoch_field = copy.deepcopy(self.epoch_field), propagator = self.propagator, propagator_args = copy.deepcopy(self.propagator_args), propagator_options = copy.deepcopy(self.propagator_options), ) pop.data = self.data.copy() return pop
[docs] def delete(self, inds): '''Remove the rows according to the given indices. Supports single index, iterable of indices and slices. ''' if isinstance(inds, int): inds = [inds] elif isinstance(inds, slice): _inds = range(self.data.shape[0]) inds = _inds[inds] elif not (isinstance(inds, list) or isinstance(inds, np.ndarray)): raise Exception('Cannot delete indecies given with type {}'.format(type(inds))) mask = np.full( (self.data.shape[0],), True, dtype=np.bool) for ind in inds: mask[ind] = False self.data=self.data[ mask ]
[docs] def filter(self,col,fun): '''Filters the population using a boolean function, keeping true values. :param str col: Column to filter, must match exactly one entry in the :code:`header` attribute. :param function fun: Function that returns boolean array used for filtering. **Example:** Filter Master population keeping only objects below 45.0 degrees inclination. .. code-block:: python from population_library import master_catalog master = master_catalog() master.filter( col='i', fun=lambda inc: inc < 45.0, ) ''' if col in self.fields: ind = self.fields.index(col) mask = np.full( (self.data.shape[0],), True, dtype=np.bool) for row in range(self.data.shape[0]): mask[row] = fun(self.data[col][row]) self.data=self.data[ mask ] else: raise Exception('No such column: {}'.format(col))
@property def shape(self): '''This is the shape of the internal data matrix ''' shape = (self.data.shape[0],len(self.fields)) return shape
[docs] def allocate(self, length): '''Allocate the internal data array for assignment of objects. **Warning:** This removes all internal data. **Example:** Create a population with two objects. Here the :code:`load_data` function is a fictional function that creates a row with the needed data. .. code-block:: python from population import Population from my_data_loader import load_data my_pop = Population( name='two objects', extra_columns = ['m', 'color'], dtypes = ['Float64', 'U20'], space_object_uses = [True, False], ) print(len(my_pop)) #will output 0 my_pop.allocate(2) print(len(my_pop)) #will output 2 my_pop.data[0] = load_data('obj1') my_pop.data[1] = load_data('obj2') ''' _dtype = [] for name, dt, ind in zip(self.fields, self.dtypes, range(len(self.fields))): _dtype.append( (name, dt)) self.data = np.empty((length,), dtype=_dtype)
[docs] def get_states(self, n=None, named=True, dtype=None): '''Use the defined state parameters to get a copy of the states ''' return self.get_fields(fields = self.state_fields, n=n, named=named, dtype=dtype)
[docs] def get_fields(self, fields, n=None, named=True, dtype=None): '''Get the orbital elements for one row from internal data array. :param int/slice/list n: Row number(s). :param list fields: List of fields to get data for :param bool named: return a named numpy array or a unnamed one. If True, all dtypes are cast as the first fields. ''' if n is None: n = slice(None,None,None) #all states = self.data[n][fields] if not named: if dtype is None: dtype = states.dtype[0] states_ = np.empty((len(states), len(fields)), dtype=dtype) for ind, key in enumerate(states.dtype.names): states_[:,ind] = states[key].astype(dtype) states = states_ del states_ return states
[docs] def get_orbit(self, n, fields=None, M_cent=pyorb.M_earth, degrees=True, anomaly='mean'): '''Get the one row from the population as a :class:`pyorb.Orbit` instance. ''' if fields is None: fields = self.state_fields kwargs = {} for key in fields: kwargs[key] = self.data[n][key] #TODO: generalize this better if 'aop' in kwargs: kwargs['omega'] = kwargs.pop('aop') if 'raan' in kwargs: kwargs['Omega'] = kwargs.pop('raan') if 'mu0' in kwargs: kwargs['anom'] = kwargs.pop('mu0') for key in ['X', 'Y', 'Z', 'VX', 'VY', 'VZ']: if key in kwargs: kwargs[key.lower()] = kwargs.pop(key) obj=pyorb.Orbit( M0 = M_cent, degrees = degrees, type=anomaly, auto_update = True, direct_update = True, num = 1, **kwargs ) return obj
[docs] def get_object(self, n): '''Get the one row from the population as a :class:`space_object.SpaceObject` instance. ''' parameters = {} if self.space_object_fields is not None: for key in self.space_object_fields: parameters[key] = self.data[key][n] cart_state = True kep_state = True for key in pyorb.Orbit.CARTESIAN: if key not in self.state_fields: cart_state = False for key in ['a', 'e', 'i']: if key not in self.state_fields: kep_state = False if 'omega' not in self.state_fields and 'aop' not in self.state_fields: kep_state = False if 'Omega' not in self.state_fields and 'raan' not in self.state_fields: kep_state = False if 'anom' not in self.state_fields and 'mu0' not in self.state_fields: kep_state = False kwargs = {} if kep_state or cart_state: for key in self.state_fields: kwargs[key] = self.data[n][key] else: kwargs['state'] = self.data[n][self.state_fields] if 'oid' in self.fields: kwargs['oid'] = self.data[n]['oid'] obj=so.SpaceObject( propagator = self.propagator, propagator_options = self.propagator_options, propagator_args = self.propagator_args, parameters = parameters, epoch=Time( self.data[self.epoch_field['field']][n], format=self.epoch_field['format'], scale=self.epoch_field['scale'], ), **kwargs ) return obj
[docs] def add_field(self, name, dtype=None): '''Add a field to the population data. ''' data_tmp = self.data.copy() self.fields.append(name) if dtype is None: dtype = Population._default_dtype self.dtypes.append(dtype) self.allocate(len(data_tmp)) for key in self.fields: if key != name: self.data[key] = data_tmp[key]
[docs] def print(self, n=None, fields=None): if n is None: n = slice(None, None, None) if fields is None: fields = self.fields data = self.data[n][fields] if isinstance(data, np.void): data = [[x for x in data]] return tabulate(data, headers=fields)
def __str__(self): return self.print() def __getitem__(self, key): if isinstance(key, tuple): if len(key) == 2: is_slice = False for _key in key: if isinstance(_key, slice): is_slice = True if is_slice: tmp_data = np.empty((self.data.shape[0], len(self.fields)), dtype=np.dtype(self._default_dtype)) for ind, col in enumerate(self.fields): col_data = self.data[col] try: tmp_data[0,ind] = col_data[0] convertable = True except ValueError: convertable = False if convertable: tmp_data[:,ind] = col_data else: tmp_data[:,ind] = np.nan return tmp_data[key[0],key[1]] else: return self.data[self.fields[key[1]]][key[0]] else: raise Exception('Too many incidences given, only supports 2') elif isinstance(key, str): if key in self.fields: return self.data[key] else: raise Exception('No such column: {}'.format(key)) elif isinstance(key, int): if key < self.data.shape[0]: return self.data[key] else: raise Exception('Row number {} outside range of population {}'.format(key, self.data.shape[0])) else: raise Exception('Key type "{}" not supported'.format(type(key))) def __setitem__(self, key, data): if isinstance(key, tuple): if len(key) == 2: row_iter = list(range(len(self)))[key[0]] head_iter = self.fields[key[1]] if not isinstance(row_iter,list): row_point = True row_iter = [row_iter] else: row_point = False if not isinstance(head_iter,list): col_point = True head_iter = [head_iter] else: col_point = False if row_point and col_point: self.data[head_iter[0]][row_iter[0]] = data else: for coli, head in enumerate(head_iter): for rowi, row_num in enumerate(row_iter): if col_point: self.data[head][row_num] = data[rowi] elif row_point: self.data[head][row_num] = data[coli] else: self.data[head][row_num] = data[rowi][coli] else: raise Exception('Too many incidences given, only supports 2') elif isinstance(key, str): if key in self.fields: self.data[key] = data else: raise Exception('No such column: {}'.format(key)) elif isinstance(key, int): if key < self.data.shape[0]: for coli, head in enumerate(self.fields): self.data[head][key] = data[coli] else: raise Exception('Row number {} outside range of population {}'.format(key, self.data.shape[0])) else: raise Exception('Key type "{}" not supported'.format(type(key))) def __iter__(self): self.__num = 0 return self def __next__(self): if self.__num < self.data.shape[0]: ret = self.get_object(self.__num) self.__num += 1 return ret else: raise StopIteration @property def generator(self): for obj in self: yield obj
[docs] def save(self, fname): if isinstance(fname, str): fname = pathlib.Path(fname) with h5py.File(fname,"w") as hf: hf.create_dataset('data', data=self.data) hf.create_dataset('fields', data=np.array(self.fields), ) hf.create_dataset('space_object_fields', data=np.array(self.space_object_fields), ) hf.create_dataset('dtypes', data=np.array(self.dtypes), ) hf.create_dataset('epoch_field', data=np.array(self.epoch_field), ) hf.create_dataset('state_fields', data=np.array(self.state_fields), )
[docs] @classmethod def load(cls, fname, propagator, propagator_options = {}, propagator_args = {}): if isinstance(fname, str): fname = pathlib.Path(fname) with h5py.File(fname,"r") as hf: pop = cls( fields = hf['fields'].value[()], dtypes = hf['dtypes'].value[()], space_object_fields = hf['space_object_fields'].value[()], state_fields = hf['state_fields'].value[()], epoch_field = hf['epoch_field'].value[()], propagator = propagator, propagator_args = propagator_options, propagator_options = propagator_args, ) pop.data = hf['data'].value[()] return pop
#python 2.7 compliance Population.next = Population.__next__