#!/usr/bin/env python
'''Basis for developing a scheduler for tracking multiple space objects.
'''
from abc import abstractmethod
import numpy as np
import pyorb
from ..controller import Tracker
from .scheduler import Scheduler
from ..passes import equidistant_sampling
[docs]class Tracking(Scheduler):
'''#TODO: Docstring
'''
[docs] def __init__(self,
radar,
space_objects,
end_time,
epoch,
start_time = 0.0,
controller = Tracker,
controller_args = dict(return_copy=True),
max_dpos = 1e3,
max_samp = 30.0,
profiler = None,
logger = None,
use_pass_states = True,
calculate_max_snr = False,
collect_passes = False,
):
super().__init__(radar, profiler=profiler, logger=logger)
self.epoch = epoch
self.controller = controller
self.controller_args = controller_args
self.space_objects = space_objects
self.start_time = start_time
self.end_time = end_time
self.collect_passes = collect_passes
self.max_dpos = max_dpos
self.max_samp = max_samp
self.use_pass_states = use_pass_states
self.calculate_max_snr = calculate_max_snr
self.states = [None]*len(space_objects)
self.states_t = [None]*len(space_objects)
self.passes = [None]*len(space_objects)
if self.use_pass_states:
self.measurements = [None]*len(space_objects)
else:
self.measurements = None
[docs] def update(self):
for ind in range(len(self.space_objects)):
self.get_passes(ind)
[docs] def get_passes(self, ind):
'''#TODO: Docstring
'''
dt = (self.space_objects[ind].epoch - self.epoch).to_value('sec')
if isinstance(self.space_objects[ind].state, pyorb.Orbit):
t = equidistant_sampling(
orbit = self.space_objects[ind].state,
start_t = self.start_time,
end_t = self.end_time,
max_dpos = self.max_dpos,
)
else:
t = np.arange(self.start_time, self.end_time, self.max_samp)
if self.logger is not None:
self.logger.info(f'Tracking:get_passes(ind={ind}):propagating {len(t)} steps')
if self.profiler is not None:
self.profiler.start('Tracking:get_passes:propagating')
states = self.space_objects[ind].get_state(t - dt)
if self.use_pass_states:
self.states[ind] = states
self.states_t[ind] = t
if self.profiler is not None:
self.profiler.stop('Tracking:get_passes:propagating')
if self.logger is not None:
self.logger.info(f'Tracking:get_passes(ind={ind}):propagating complete')
if self.profiler is not None:
self.profiler.start('Tracking:get_passes:find_passes')
self.passes[ind] = self.radar.find_passes(t, states, cache_data = True)
if self.profiler is not None:
self.profiler.stop('Tracking:get_passes:find_passes')
#we may need SNR to plan observations
for txi in range(len(self.radar.tx)):
for rxi in range(len(self.radar.rx)):
if self.logger is not None:
self.logger.info(f'Tracking:get_passes(ind={ind}):tx{txi}-rx{rxi} {len(self.passes[ind][txi][rxi])} passes')
if not self.calculate_max_snr:
continue
for ps in self.passes[ind][txi][rxi]:
if self.profiler is not None:
self.profiler.start('Tracking:get_passes:calculate_max_snr')
ps.calculate_max_snr(
rx = self.radar.rx[rxi],
tx = self.radar.tx[txi],
diameter = self.space_objects[ind].d,
)
if self.profiler is not None:
self.profiler.stop('Tracking:get_passes:calculate_max_snr')
return self.passes[ind], states, t
[docs] @abstractmethod
def set_measurements(self, *args, **kwargs):
'''#TODO: Docstring
'''
pass
[docs] def get_controllers(self):
'''#TODO: Docstring
'''
if self.logger is not None:
self.logger.debug(f'Tracking:get_controllers')
ctrls = []
for ind in range(len(self.space_objects)):
if self.collect_passes:
if self.use_pass_states:
minds = self.measurements[ind]
states = self.states[ind][:3,minds]
t = self.states_t[ind][minds]
else:
states = self.states[ind][:3,:]
t = self.states_t[ind][:]
ctrl = self.controller(
radar = self.radar,
t=t,
t0=0.0,
ecefs = states[:3,:],
**self.controller_args
)
ctrl.meta['target'] = f'Object {self.space_objects[ind].oid}'
ctrls.append(ctrl)
else:
for pass_ind in range(len(self.states_t[ind])):
if self.use_pass_states:
minds = self.measurements[ind][pass_ind]
states = self.states[ind][pass_ind][:3,minds]
t = self.states_t[ind][pass_ind][minds]
else:
states = self.states[ind][pass_ind][:3,:]
t = self.states_t[ind][pass_ind][:]
ctrl = self.controller(
radar = self.radar,
t=t,
t0=0.0,
ecefs = states[:3,:],
**self.controller_args
)
ctrl.meta['target'] = f'Object {self.space_objects[ind].oid}'
ctrl.meta['pass'] = pass_ind
ctrls.append(ctrl)
return ctrls
[docs]class PriorityTracking(Tracking):
'''#TODO: Docstring
'''
[docs] def __init__(self,
radar,
space_objects,
end_time,
epoch,
timeslice,
allocation,
**kwargs
):
self.priority = kwargs.pop('priority', None)
super().__init__(radar, space_objects, end_time, epoch, **kwargs)
self.timeslice = timeslice
self.allocation = allocation
if self.priority is None:
self.priority = np.ones((len(self.space_objects),), dtype=np.float64)
[docs] def set_measurements(self):
'''#TODO: Docstring
'''
if not self.collect_passes:
raise NotImplementedError()
total_measurements = int(self.allocation/self.timeslice)
if not isinstance(self.priority, np.ndarray):
pri = np.array(self.priority)
pri = pri/pri.sum()
else:
pri = self.priority/self.priority.sum()
#per object
measurements = np.floor(pri*total_measurements).astype(np.int64)
for ind in range(len(self.space_objects)):
if self.use_pass_states:
all_inds = []
for txi in range(len(self.passes[ind])):
for rxi in range(len(self.passes[ind][txi])):
all_inds += [ps.inds for ps in self.passes[ind][txi][rxi]]
if len(all_inds) == 0:
self.measurements[ind] = np.empty((0,), dtype=np.int64)
continue
all_inds = np.concatenate(all_inds, axis=0)
all_inds = np.unique(all_inds)
#uniform distribution
dt = int(len(all_inds)/measurements[ind])
if dt == 0:
dt = 1
all_inds = all_inds[::dt]
self.measurements[ind] = all_inds
else:
raise NotImplementedError()