#!/usr/bin/env python
'''Contains all helper functions to automate parallelization with MPI, handle caching and stepping of simulations.
'''
#Python standard import
import pathlib
import shutil
from collections import OrderedDict
import logging
import copy
import pickle
import datetime
from glob import glob
import os
#Third party import
import h5py
import numpy as np
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
except ImportError:
comm = None
#Local import
from . import profiling
[docs]def mpi_wrap_master_thread(func):
'''Wrap function to only execute on thread rank=0
'''
def master_th_func(*args, **kwargs):
if comm is not None:
if comm.rank == 0:
ret = func(*args, **kwargs)
else:
ret = None
comm.barrier()
else:
ret = func(*args, **kwargs)
return ret
return master_th_func
[docs]@mpi_wrap_master_thread
def mpi_mkdir(path):
'''Make directory on thread rank=0
'''
path.mkdir(exist_ok=True)
[docs]@mpi_wrap_master_thread
def mpi_rmdir(path):
'''Remove directory on thread rank=0 with :code:`shutil.rmtree`
'''
shutil.rmtree(path)
[docs]@mpi_wrap_master_thread
def mpi_copy(src, dst, linkfiles=False):
'''Copy path on thread rank=0 with :code:`shutil.copytree` or :code:`copy2` for files. If :code:`linkfiles` is true, files are soft-linked rather then copied.
'''
if src.is_dir():
if linkfiles:
shutil.copytree(src, dst, copy_function=os.link)
else:
shutil.copytree(src, dst)
else:
if linkfiles:
os.link(src, dst)
else:
shutil.copy2(src, dst)
[docs]def log_exceptions(func):
'''If instance has a logger, log exceptions raised by this method.
'''
def wrapped_step(self, *args, **kwargs):
try:
rets = func(self, *args, **kwargs)
except BaseException as err:
if hasattr(self, 'logger'):
if self.logger is not None:
self.logger.exception(f'\nargs: {args}\n kwargs: {kwargs}')
raise err
return rets
return wrapped_step
[docs]def MPI_single_process(process_id):
'''Simulation step single process method restriction decorator.
:param int process_id: The process id the wrapped function should only execute on. All other processes return :code:`None`.
'''
def step_wrapping(func):
def wrapped_step(self, *args, **kwargs):
if comm is not None:
if comm.rank == process_id:
rets = func(self, *args, **kwargs)
else:
rets = None
else:
rets = func(self, *args, **kwargs)
return rets
if hasattr(func, '_simulation_step'):
wrapped_step._simulation_step = func._simulation_step
return wrapped_step
return step_wrapping
[docs]def MPI_action(action, iterable = False, root = 0):
'''Simulation step MPI post step action decorator.
:param str action: Mode of operations on node-data communication, available options are "gather", "gather-clear", "bcast" and "barrier".
:param bool iterable: Indicates if the "gather", "gather-clear" or "bcast" should consider an iterable (that has been parallelized with MPI).
:param int root: The target MPI process for the "gather", "gather-clear" and for the source process for "bcast" if :code:`iterable=False`.
'''
def step_wrapping(func):
def wrapped_step(self, *args, **kwargs):
rets = func(self, *args, **kwargs)
if comm is None:
return rets
if iterable:
mpi_inds = []
for thrid in range(comm.size):
mpi_inds.append(range(thrid, len(rets), comm.size))
if action == 'barrier':
comm.barrier()
elif action.startswith('gather'):
if iterable:
if comm.rank == root:
for thr_id in range(comm.size):
if thr_id != root:
for ind in mpi_inds[thr_id]:
rets[ind] = comm.recv(source=thr_id, tag=ind)
else:
for ind in mpi_inds[comm.rank]:
comm.send(rets[ind], dest=root, tag=ind)
if action == 'gather-clear':
if comm.rank != root:
for ind in mpi_inds[comm.rank]:
rets[ind] = None
else:
all_rets = [None]*comm.size
all_rets[comm.rank] = rets
if comm.rank == root:
for thr_id in range(comm.size):
if thr_id != root:
all_rets[thr_id] = comm.recv(source=thr_id, tag=thr_id)
else:
comm.send(all_rets[comm.rank], dest=root, tag=comm.rank)
if action == 'gather-clear':
if comm.rank != root:
all_rets[comm.rank] = None
rets = all_rets
elif action == 'bcast':
if iterable:
for thr_id in range(comm.size):
for ind in mpi_inds[thr_id]:
rets[ind] = comm.bcast(rets[ind], root=thr_id)
else:
rets = comm.bcast(rets, root=root)
return rets
if hasattr(func, '_simulation_step'):
wrapped_step._simulation_step = func._simulation_step
return wrapped_step
return step_wrapping
[docs]def iterable_step(iterable, MPI=False, log=False, reduce=None):
'''Simulation step iteration decorator.
:param str/list iterable: The name/list of names of the instance properties (fetched using :code:`getattr`) to iterate over. Can be multiple levels, e.g. :code:`object.subobject.a_list`.
:param bool MPI: Determines if the iteration should be MPI-parallelized
:param bool log: Use the :code:`self.logger` instance, if it exists, to log the execution of the iteration.
:param function reduce: A pointer to the binary-function used to reduce the results.
'''
reduce_ = reduce;
if isinstance(iterable, str):
iterable = [iterable]
def step_wrapping(func):
def wrapped_step(self, *args, **kwargs):
all_attrs = []
for var in iterable:
subvars = var.split('.')
for vari, subvar in enumerate(subvars):
if vari == 0:
obj = getattr(self, subvar)
else:
obj = getattr(obj, subvar)
all_attrs.append(obj)
if len(all_attrs) == 1:
attr = all_attrs[0]
else:
attr = list(zip(*all_attrs))
if MPI and comm is not None:
_iter = list(range(comm.rank, len(attr), comm.size))
else:
_iter = list(range(len(attr)))
if reduce_ is None:
rets = [None]*len(attr)
else:
rets = None
step_name = kwargs.get('_step_name', None)
profiler_name = f'Simulation:iterable_step_{step_name}'
_iters = 0
_total = len(_iter)
for index in _iter:
if log and self.profiler is not None:
self.profiler.start(profiler_name)
item = attr[index]
if hasattr(func, '_simulation_step'):
kwargs['_iterable_index'] = index
ret = func(self, index, item, *args, **kwargs)
if reduce_ is None:
rets[index] = ret
else:
rets = reduce_(rets, ret)
del ret
if log and self.profiler is not None:
self.profiler.stop(profiler_name)
_iters += 1
if log and self.logger is not None:
if self.profiler is not None:
_spent = self.profiler.total(name=profiler_name)
_est_left = (_total - _iters)*self.profiler.mean(name=profiler_name)
self.logger.always(f'Simulation:{step_name}:iterable_step: {_iters}/{_total}\n'
+ f'[Elapsed ] {str(datetime.timedelta(seconds=_spent))} | '
+ f'[Time left] {str(datetime.timedelta(seconds=_est_left))}'
)
else:
self.logger.always(f'Simulation:{step_name}:iterable_step: {_iters}/{_total}')
if log and self.profiler is not None:
if step_name is None:
del self.profiler.exec_times[profiler_name]
return rets
wrapped_step._simulation_step = True
return wrapped_step
return step_wrapping
[docs]def store_step(store, iterable=False):
'''Simulation step storing decorator
:param str/list store: The name/list names of the properties to save the method return as (set using :code:`setattr`). Can be multiple levels, e.g. :code:`object.subobject.a_property`. The order of the names correspond to the order of method returned variables.
:param bool iterable: Determines if the return of the method is an iteration or not. If its an iteration, it splits the return values into different lists based on the number of variables.
'''
if isinstance(store, str):
store = [store]
def step_wrapping(func):
def wrapped_step(self, *args, **kwargs):
rets = func(self, *args, **kwargs)
for si, var in enumerate(store):
subvars = var.split('.')
if len(subvars) == 1:
obj = self
name = subvars[0]
else:
for vari, subvar in enumerate(subvars[:-1]):
if vari == 0:
obj = getattr(self, subvar)
else:
obj = getattr(obj, subvar)
name = subvars[-1]
if iterable:
iter_obj = [None]*len(rets)
for index in range(len(rets)):
if len(store) == 1 or rets[index] is None:
iter_obj[index] = rets[index]
else:
iter_obj[index] = rets[index][si]
setattr(obj, name, iter_obj)
else:
if len(store) == 1 or rets is None:
setattr(obj, name, rets)
else:
setattr(obj, name, rets[si])
return rets
wrapped_step._simulation_step = True
return wrapped_step
return step_wrapping
[docs]def iterable_cache(steps, caches, MPI=False, log=False, reduce=None):
'''Simulation step cache iteration decorator
:param str/list steps: The name/list of names of the cached steps to be iterated over. It uses the step name to find the files in the corresponding folder.
:param str/list caches: The name/list of cache-methods to be used to load the caches of the steps.
:param bool MPI: Determines if the iteration should be MPI-parallelized.
:param bool log: Use the :code:`self.logger` instance, if it exists, to log the execution of the iteration.
:param function reduce: A pointer to the binary-function used to reduce the results.
'''
reduce_ = reduce;
if isinstance(steps, str):
steps = [steps]
if isinstance(caches, str):
caches = [caches]
def step_wrapping(func):
def wrapped_step(self, *args, **kwargs):
all_files = []
all_indecies = []
for step, cache in zip(steps, caches):
dir_ = self.get_path(step)
if not dir_.is_dir():
raise ValueError(f'Input step {step} has no cache {cache}')
files = [pathlib.Path(x) for x in glob(str(dir_ / f'*'))]
indecies = [int(file.stem.split('_')[0]) for file in files]
files = [x for _, x in sorted(zip(indecies,files), key=lambda pair: pair[0])]
indecies = sorted(indecies)
all_files += [files]
all_indecies += [indecies]
if len(steps) == 1:
files_lst = [[x] for x in all_files[0]]
else:
#check all inds exist
for index, inds in enumerate(zip(*all_indecies)):
if len(set(inds)) > 1:
raise ValueError(f'Missing index {inds}')
files_lst = list(zip(*all_files))
indecies = all_indecies[0]
if MPI and comm is not None:
_iter = list(range(comm.rank, len(indecies), comm.size))
else:
_iter = list(range(len(indecies)))
if reduce_ is None:
rets = [None]*len(indecies)
else:
rets = None
step_name = kwargs.get('_step_name', None)
profiler_name = f'Simulation:iterable_cache:{step_name}'
_iters = 0
_total = len(_iter)
for index__ in _iter:
index = indecies[index__]
files = files_lst[index__]
if log and self.profiler is not None:
self.profiler.start(profiler_name)
item = []
for cache, fname in zip(caches, files):
lfunc = getattr(self, f'load_{cache}')
item += [lfunc(fname)]
if len(caches) == 1:
item = item[0]
if hasattr(func, '_simulation_step'):
kwargs['_iterable_index'] = index
ret = func(self, index, item, *args, **kwargs)
if reduce_ is None:
rets[index__] = ret
else:
rets = reduce_(rets, ret)
del ret
if log and self.profiler is not None:
self.profiler.stop(profiler_name)
_iters += 1
if log and self.logger is not None:
if self.profiler is not None:
_spent = self.profiler.total(name=profiler_name)
_est_left = (_total - _iters)*self.profiler.mean(name=profiler_name)
self.logger.always(f'Simulation:{step_name}:iterable_cache: {_iters}/{_total}\n'
+ f'[Elapsed ] {str(datetime.timedelta(seconds=_spent))} | '
+ f'[Time left] {str(datetime.timedelta(seconds=_est_left))}'
)
else:
self.logger.always(f'Simulation:{step_name}:iterable_cache: {_iters}/{_total}')
if log and self.profiler is not None:
if step_name is None:
del self.profiler.exec_times[profiler_name]
return rets
wrapped_step._simulation_step = True
return wrapped_step
return step_wrapping
[docs]def cached_step(caches):
'''Simulation step caching decorator
:param str caches: Is a list of strings for the caches to be used, available by default is "h5" and "pickle". Custom caches are implemented by implementing methods with the string name but prefixed with :code:`load_` and :code:`save_`.
'''
if isinstance(caches, str):
caches = [caches]
def step_wrapping(func):
def wrapped_step(self, *args, **kwargs):
step = kwargs.get('_step_name', 'cached_data')
fname_parts = kwargs.pop('_fname_parts', ['data'])
index = kwargs.get('_iterable_index', None)
if index is None:
index_lst = []
else:
index_lst = [str(index)]
dir_ = self.get_path(step)
if not dir_.is_dir():
mpi_mkdir(dir_)
loaded_ = False
#load
for cache in caches:
fname = dir_ / f'{"_".join(index_lst + fname_parts)}.{cache}'
if fname.is_file():
lfunc = getattr(self, f'load_{cache}')
try:
ret = lfunc(fname)
loaded_ = True
except (OSError, EOFError, UnicodeError, ):
fname.unlink()
if loaded_:
break
#if there are no caches
if not loaded_:
ret = func(self, *args, **kwargs)
#save
for cache in caches:
fname = dir_ / f'{"_".join(index_lst + fname_parts)}.{cache}'
sfunc = getattr(self, f'save_{cache}')
sfunc(fname, ret)
return ret
wrapped_step._simulation_step = True
return wrapped_step
return step_wrapping
[docs]class Simulation:
'''Convenience simulation handler, creates a step-by-step simulation sequence and creates file system structure for saving of data to disk.
:param Scheduler scheduler: A scheduler instance to run. This input is used to assure that the same logger and profiler is used for the Simulation and the Scheduler.
:param str/pathlib.Path root: The path to the root folder where all files will be stored.
:param bool logger: If :code:`False`, do not instantiate a logger.
:param bool profiler: If :code:`False`, do not instantiate a profiler.
'''
[docs] def __init__(self, scheduler, root, logger=True, profiler=True, **kwargs):
self.steps = OrderedDict()
self.scheduler = scheduler
if not isinstance(root, pathlib.Path):
root = pathlib.Path(root)
self.root = root
self.branch_name = 'master'
if not self.root.is_dir():
mpi_mkdir(self.root)
_master = self.root / self.branch_name
if not _master.is_dir():
mpi_mkdir(_master)
self.make_paths()
if logger:
self.logger = profiling.get_logger(
'Simulation',
path = self.log_path,
file_level = kwargs.get('file_level', logging.INFO),
term_level = kwargs.get('term_level', logging.INFO),
)
self.scheduler.logger = self.logger
else:
self.logger = None
if profiler:
self.profiler = profiling.Profiler()
self.scheduler.profiler = self.profiler
else:
self.profiler = None
[docs] def save_pickle(self, path, data):
with open(path, 'wb') as h:
pickle.dump(data, h)
[docs] def load_pickle(self, path):
if path.is_file():
with open(path, 'rb') as h:
ret = pickle.load(h)
return ret
[docs] def save_h5(self, path, data):
with h5py.File(path,'w') as h:
if isinstance(data, dict):
for key in data:
if isinstance(data[key], np.ndarray):
h.create_dataset(key, data=data[key])
else:
h.attrs[key] = data[key]
if isinstance(data, list) or isinstance(data, tuple):
for ind in range(len(data)):
h.attrs['__ret_len'] = len(data)
if isinstance(data[ind], np.ndarray):
h.create_dataset(f'saved_data__{ind}', data=data[ind])
else:
h.attrs[f'saved_data__{ind}'] = data[ind]
else:
h.create_dataset('__saved_data', data=data)
[docs] def load_h5(self, path):
if path.is_file():
with h5py.File(path,'r') as h:
if '__saved_data' in h:
ret = h['__saved_data'][()].copy()
elif '__ret_len' in h.attrs:
ret = [None]*copy.copy(h.attrs['__ret_len'])
for ind in range(len(ret)):
key = f'saved_data__{ind}'
if key in h:
ret[ind] = h[key][()].copy()
else:
ret[ind] = copy.copy(h.attrs[key])
else:
ret = {}
for key in h:
ret[key] = h[key][()].copy()
for key in h.attrs:
ret[key] = copy.copy(h.attrs[key])
return ret
[docs] def make_paths(self):
'''Make all the folder for the current branch according to :code:`self.paths`.
'''
for path in self.paths:
mpi_mkdir(self.get_path(path))
@property
def paths(self):
'''List of the name of all folders
'''
return [key for key in self.steps] + ['logs']
@property
def log_path(self):
'''Path to the current log-output folder
'''
return self.root / self.branch_name / 'logs'
[docs] def get_path(self, name=None):
'''Given a relative file path, get the absolute path including root and branch.
'''
if name is None:
return self.root / self.branch_name
else:
return self.root / self.branch_name / name
[docs] def delete(self, branch):
'''Delete branch.
'''
if (self.root / branch).is_dir():
mpi_rmdir(self.root / branch)
if self.branch_name == branch:
mpi_mkdir(self.root / self.branch_name)
self.make_paths()
[docs] def branch(self, name, empty=False, linkfiles=None):
'''Create branch by creating a copy of the current branch state and checkout that branch. If the branch exists, just checkout that branch.
:param str name: Name of the new branch
:param bool empty: If :code:`True` do not copy the state of the current branch.
:param list/bool linkfiles: If a list of paths that should be soft-linked rather then copied. If :code:`True`, soft-link all files.
:return: None
'''
if (self.root / name).is_dir():
if self.logger is not None:
self.logger.info(f'Branch "{name}" already exists')
else:
if empty:
mpi_mkdir(self.root / name)
for path in self.paths:
mpi_mkdir(self.root / name / path)
else:
if linkfiles is None:
mpi_copy(self.root / self.branch_name, self.root / name, linkfiles=False)
elif isinstance(linkfiles, list):
listing = pathlib.Path(self.root / self.branch_name).glob('./*')
for pth in listing:
if pth.name in linkfiles:
mpi_copy(pth, self.root / name / pth.name, linkfiles=True)
else:
mpi_copy(pth, self.root / name / pth.name, linkfiles=False)
elif linkfiles:
mpi_copy(self.root / self.branch_name, self.root / name, linkfiles=True)
else:
raise TypeError(f'linkfiles type "{type(linkfiles)}" not supported')
# Make sure log directory exists
mpi_mkdir(self.root / name / 'logs')
self.checkout(name)
[docs] def checkout(self, branch):
'''Change to branch.
'''
self.branch_name = branch
self.logger = profiling.change_logfile(self.logger, self.log_path)
[docs] @log_exceptions
def run(self, step = None, *args, **kwargs):
'''Run specific step with the supplied arguments or run all steps'''
self.make_paths()
if step is None:
for name, func in self.steps.items():
if self.profiler is not None: self.profiler.start(f'Simulation:run:{name}')
if self.logger is not None: self.logger.info(f'Simulation:run:{name}')
if hasattr(func, '_simulation_step'):
func(*args, _step_name=name, **kwargs)
else:
func(*args, **kwargs)
if self.profiler is not None: self.profiler.stop(f'Simulation:run:{name}')
if self.logger is not None: self.logger.info(f'Simulation:run:{name} [completed]')
else:
func = self.steps[step]
if self.profiler is not None: self.profiler.start(f'Simulation:run:{step}')
if self.logger is not None: self.logger.info(f'Simulation:run:{step}')
if hasattr(func, '_simulation_step'):
func(*args, _step_name=step, **kwargs)
else:
func(*args, **kwargs)
if self.profiler is not None: self.profiler.stop(f'Simulation:run:{step}')
if self.logger is not None: self.logger.info(f'Simulation:run:{step} [completed]')