Source code for sorts.scheduler.observed_parameters

#!/usr/bin/env python



import numpy as np

from .scheduler import Scheduler

from ..signals import hard_target_snr, hard_target_rcs
from ..passes import Pass

[docs]class ObservedParameters(Scheduler): '''Bi-static radar observation parameters of hard targets. **Parameters calculated** * time * signal to noise ratio * range * range rate * transmitter local pointing to target k * receiver pointing to target k * radar cross section #TODO: Docstring '''
[docs] def __init__(self, radar, logger=None, profiler=None, **kwargs): super().__init__( radar=radar, logger=logger, profiler=profiler, )
[docs] def calculate_observation_jacobian(self, txrx_pass, space_object, variables, deltas, transforms={}, **kwargs): '''Calculate the observation and its Jacobean of a pass of a specific space object given the current state of the Scheduler. The Jacobean assumes that the SpaceObject has a Orbit state. To perturb non Orbit states a custom implementation is needed. NOTE: During the numerical calculation of the Jacobean only the range and range rates are calculated and `calculate_snr=False`. #TODO: Docstring ''' if self.logger is not None: self.logger.debug(f'Obs.Param.:calculate_observation_jacobian:{variables}, deltas={deltas}') if self.profiler is not None: self.profiler.start('Obs.Param.:calculate_observation_jacobian') t, generator = self(txrx_pass.start(), txrx_pass.end()) if generator is None: return None, None if self.profiler is not None: self.profiler.start('Obs.Param.:calculate_observation_jacobian:reference') snr_limit = kwargs.get('snr_limit', True) kwargs['snr_limit'] = False data0 = self.calculate_observation(txrx_pass, t, generator, space_object, **kwargs) snr_inds = np.full(data0['snr'].shape, True, dtype=np.bool) if snr_limit: snr_db = np.log10(data0['snr'])*10.0 snr_inds[np.logical_or(np.isnan(snr_db),np.isinf(snr_db))] = False snr_inds[snr_inds] = snr_db[snr_inds] > self.radar.min_SNRdb for key in data0: data0[key] = data0[key][...,snr_inds] if self.profiler is not None: self.profiler.stop('Obs.Param.:calculate_observation_jacobian:reference') if data0 is None: return None, None t_filt = t[snr_inds] J = np.zeros([len(t_filt)*2,len(variables)], dtype=np.float64) kwargs['calculate_snr'] = False for ind, var in enumerate(variables): if self.profiler is not None: self.profiler.start(f'Obs.Param.:calculate_observation_jacobian:d_{var}') dso = space_object.copy() if var in transforms: Tx = transforms[var][0](getattr(dso, var)) + deltas[ind] dx = transforms[var][1](Tx) else: dx = getattr(dso, var) + deltas[ind] dso.update(**{var: dx}) ddata = self.calculate_observation(txrx_pass, t, generator, dso, **kwargs) for key in data0: ddata[key] = ddata[key][...,snr_inds] dr = (ddata['range'] - data0['range'])/deltas[ind] dv = (ddata['range_rate'] - data0['range_rate'])/deltas[ind] J[:len(t_filt),ind]=dr J[len(t_filt):,ind]=dv if self.profiler is not None: self.profiler.stop(f'Obs.Param.:calculate_observation_jacobian:d_{var}') if self.profiler is not None: self.profiler.stop('Obs.Param.:calculate_observation_jacobian') return data0, J
[docs] def calculate_observation(self, txrx_pass, t, generator, space_object, calculate_snr=True, interpolator=None, snr_limit=True): '''Calculate the observation of a pass of a specific space object given the current state of the Scheduler. #TODO: Docstring ''' txi, rxi = txrx_pass.station_id if self.logger is not None: self.logger.debug(f'Obs.Param.:calculate_observation:(tx={txi}, rx={rxi}), len(t) = {len(t)}') if self.profiler is not None: self.profiler.start('Obs.Param.:calculate_observation') if calculate_snr: diam = space_object.d else: diam = None if self.profiler is not None: self.profiler.start('Obs.Param.:calculate_observation:get_state') if interpolator is not None: states = interpolator.get_state(t) else: states = space_object.get_state(t) if self.profiler is not None: self.profiler.stop('Obs.Param.:calculate_observation:get_state') snr = np.empty((len(t),), dtype=np.float64) rcs = np.empty((len(t),), dtype=np.float64) keep = np.full((len(t),), True, dtype=np.bool) if self.profiler is not None: self.profiler.start('Obs.Param.:calculate_observation:enus,range,range_rate') enus = [ self.radar.tx[txi].enu(states), self.radar.rx[rxi].enu(states), ] ranges = [Pass.calculate_range(enu) for enu in enus] range_rates = [Pass.calculate_range_rate(enu) for enu in enus] if self.profiler is not None: self.profiler.stop('Obs.Param.:calculate_observation:enus,range,range_rate') if self.profiler is not None: self.profiler.start('Obs.Param.:calculate_observation:generator') for ti, mrad in enumerate(generator): radar, meta = mrad if self.profiler is not None: self.profiler.start('Obs.Param.:calculate_observation:snr-step') if radar.tx[txi].enabled and radar.rx[rxi].enabled and calculate_snr: if self.profiler is not None: self.profiler.start('Obs.Param.:calculate_observation:snr-step:gain') if len(radar.tx[txi].beam.pointing.shape) > 1: tx_g = np.max([ radar.tx[txi].beam.gain(enus[0][:3,ti], ind={'pointing': pi}) for pi in range(radar.tx[txi].beam.pointing.shape[1]) ]) else: tx_g = radar.tx[txi].beam.gain(enus[0][:3,ti]) if len(radar.rx[rxi].beam.pointing.shape) > 1: rx_g = np.max([ radar.rx[rxi].beam.gain(enus[1][:3,ti], ind={'pointing': pi}) for pi in range(radar.rx[rxi].beam.pointing.shape[1]) ]) else: rx_g = radar.rx[rxi].beam.gain(enus[1][:3,ti]) if self.profiler is not None: self.profiler.stop('Obs.Param.:calculate_observation:snr-step:gain') self.profiler.start('Obs.Param.:calculate_observation:snr-step:snr') snr[ti] = hard_target_snr( tx_g, rx_g, radar.rx[rxi].wavelength, radar.tx[txi].power, ranges[0][ti], ranges[1][ti], diameter=diam, bandwidth=radar.tx[txi].coh_int_bandwidth, rx_noise_temp=radar.rx[rxi].noise, ) if self.profiler is not None: self.profiler.stop('Obs.Param.:calculate_observation:snr-step:snr') rcs[ti] = hard_target_rcs( wavelength=radar.rx[rxi].wavelength, diameter=diam, ) if snr_limit: snr_db = np.log10(snr[ti])*10.0 if np.isnan(snr_db) or np.isinf(snr_db): keep[ti] = False else: keep[ti] = snr_db > radar.min_SNRdb else: snr[ti] = np.nan rcs[ti] = np.nan if calculate_snr: keep[ti] = False if self.profiler is not None: self.profiler.stop('Obs.Param.:calculate_observation:snr-step') if self.profiler is not None: self.profiler.stop('Obs.Param.:calculate_observation:generator') data = dict( t = t, snr = snr, range = ranges[0] + ranges[1], range_rate = range_rates[0] + range_rates[1], tx_k = enus[0][:3,:]/ranges[0], rx_k = enus[1][:3,:]/ranges[1], rcs = rcs, ) if np.any(keep): for key in data: data[key] = data[key][...,keep] else: data = None if self.profiler is not None: self.profiler.stop('Obs.Param.:calculate_observation') if self.logger is not None: self.logger.debug(f'Obs.Param.:calculate_observation:complete') return data