Source code for sorts.scheduler.scheduler

#!/usr/bin/env python

'''Scheduler base class.

'''
#Python standard import
from abc import ABC, abstractmethod


#Third party import
import numpy as np

#Local import


[docs]class Scheduler(ABC): '''A Scheduler for executing time-slices of different radar controllers. #TODO: Docstring '''
[docs] def __init__(self, radar, profiler=None, logger=None): self.radar = radar self.logger = logger self.profiler = profiler
[docs] @abstractmethod def update(self, *args, **kwargs): '''Update the scheduler information. ''' pass
[docs] @abstractmethod def get_controllers(self): '''This should init a list of controllers and set the `t` (global time) and `t0` (global time reference for controller) variables on them for their individual time samplings. ''' pass
[docs] def generate_schedule(self, t, generator): '''Takes times and a corresponding generator that returns radar instances to generate a radar schedule. ''' raise NotImplementedError()
[docs] def calculate_observation(self, txrx_pass, t, generator, **kwargs): '''Takes a pass over a tx-rx pair and the corresponding evaluated times and generator that returns radar instances to generate a set of observed data. ''' raise NotImplementedError()
[docs] @staticmethod def chain_generators(generators): for generator in generators: yield from generator
[docs] def schedule(self, **kwargs): '''#TODO: Docstring ''' ctrls = self.get_controllers() times = np.concatenate([c.t for c in ctrls], axis=0) sched = Scheduler.chain_generators([c.run() for c in ctrls]) return self.generate_schedule(times, sched, **kwargs)
def __call__(self, start, stop): '''#TODO: Docstring ''' ctrls = self.get_controllers() check_t = lambda c: np.logical_and(c.t >= start, c.t <= stop) ctrls = [c for c in ctrls if np.any(check_t(c))] if len(ctrls) == 0: if self.logger is not None: self.logger.debug(f'Scheduler:__call__:No radar events found between {start:.1f} and {stop:.1f}') return None, None else: t = np.concatenate([c.t[check_t(c)] for c in ctrls], axis=0) if self.logger is not None: self.logger.debug(f'Scheduler:__call__:{len(t)} events found between {start:.1f} and {stop:.1f}') return t, Scheduler.chain_generators([c(c.t[check_t(c)] - c.t0) for c in ctrls])
[docs] def observe_passes(self, passes, **kwargs): '''#TODO: Docstring ''' data = [] for txi in range(len(passes)): data.append([]) for rxi in range(len(passes[txi])): data[-1].append([]) for ps in passes[txi][rxi]: t, generator = self(ps.start(), ps.end()) if generator is not None: pass_data = self.calculate_observation(ps, t, generator, **kwargs) else: pass_data = None data[-1][-1].append(pass_data) return data