import multiprocessing
import signal
import traceback
from collections import OrderedDict
from functools import wraps
from io import BytesIO
from itertools import count
from logging import getLogger
import cloudpickle
import numpy as num
mp_context = multiprocessing
# mp_context = multiprocessing.get_context("spawn")
# monkey patch pickling in multiprocessing
if False:
@classmethod
def dumps(cls, obj, protocol=None):
buf = BytesIO()
cls(buf, protocol).dump(obj)
return buf.getbuffer()
mp_context.reduction.ForkingPickler = cloudpickle.CloudPickler
mp_context.reduction.ForkingPickler.dumps = cloudpickle.dumps
mp_context.reduction.ForkingPickler.loads = cloudpickle.loads
logger = getLogger("parallel")
# for sharing memory across processes
_shared_memory = OrderedDict()
_tobememshared = set([])
[docs]
def get_process_id():
"""
Returns the process id of the current process
"""
try:
current = mp_context.current_process()
n = current._identity[0]
except IndexError:
# in case of only one used core ...
n = 1
return n
[docs]
def check_available_memory(filesize):
"""
Checks if the system memory can handle the given filesize.
Parameters
----------
filesize : float
in [Mb] megabyte
"""
from psutil import virtual_memory
mem = virtual_memory()
avail_mem_mb = mem.available / (1080**2)
phys_mem_mb = mem.total / (1080**2)
logger.debug(
"Physical Memory [Mb] %f \n "
"Available Memory [Mb] %f \n " % (phys_mem_mb, avail_mem_mb)
)
if filesize > phys_mem_mb:
raise MemoryError(
"Physical memory on this system: %f is to small for the"
" FFI setup configuration! The problem complexity"
" (please reduce the number of: patches, stations,"
" starttimes or durations or reduce the sample rate"
" of your data and synthetcs.)"
" has to be reduced or the hardware needs to be"
" upgraded!" % phys_mem_mb
)
if filesize > avail_mem_mb:
logger.warn(
"The Greens Function Library filesize is larger than"
" the available memory. Likely it will use the SWAP which"
" may result in extremely slowed down calculation times!"
)
[docs]
def exception_tracer(func):
"""
Function decorator that returns a traceback if an Error is raised in
a child process of a pool.
"""
@wraps(func)
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
msg = "{}\n\nOriginal {}".format(e, traceback.format_exc())
print("Exception in " + func.__name__)
raise type(e)(msg)
return wrapped_func
[docs]
class TimeoutException(Exception):
"""
Exception raised if a per-task timeout fires.
"""
def __init__(self, jobstack=[]):
super(TimeoutException, self).__init__()
self.jobstack = jobstack
# http://stackoverflow.com/questions/8616630/time-out-decorator-on-a-multprocessing-function
[docs]
def overseer(timeout):
"""
Function decorator that raises a TimeoutException exception
after timeout seconds, if the decorated function did not return.
"""
def decorate(func):
def timeout_handler(signum, frame):
raise TimeoutException(traceback.format_stack())
@wraps(func)
def wrapped_f(*args, **kwargs):
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
result = func(*args, **kwargs)
# Old signal handler is restored
signal.signal(signal.SIGALRM, old_handler)
signal.alarm(0) # Alarm removed
return result
wrapped_f.__name__ = func.__name__
return wrapped_f
return decorate
[docs]
class WatchedWorker(object):
"""
Wrapper class for parallel execution of a task.
Parameters
----------
task : function to execute
work : List
of arguments to specified function
timeout : int
time [s] after which worker is fired, default 65536s
"""
def __init__(self, task, work, initializer=None, initargs=(), timeout=0xFFFF):
self.function = task
self.work = work
self.timeout = timeout
self.initializer = initializer
self.initargs = initargs
[docs]
def run(self):
"""
Start working on the task!
"""
if self.initializer is not None:
self.initializer(*self.initargs)
try:
return self.function(*self.work)
except TimeoutException:
logger.warn("Worker timed out! Fire him! Returning: None!")
return None
def _pay_worker(worker):
"""
Wrapping function for the pool start instance.
"""
return overseer(worker.timeout)(worker.run)()
[docs]
def paripool(
function,
workpackage,
nprocs=None,
chunksize=1,
timeout=0xFFFF,
initializer=None,
initargs=(),
worker_initializer=None,
winitargs=(),
):
"""
Initialises a pool of workers and executes a function in parallel by
forking the process. Does forking once during initialisation.
Parameters
----------
function : function
python function to be executed in parallel
workpackage : list
of iterables that are to be looped over/ executed in parallel usually
these objects are different for each task.
nprocs : int
number of processors to be used in parallel process
chunksize : int
number of work packages to throw at workers in each instance
timeout : int
time [s] after which processes are killed, default: 65536s
initializer : function
to init pool with may be container for shared arrays
initargs : tuple
of arguments for the initializer
worker_initializer : function
to initialize each worker process
winitargs : tuple
of argument to worker_initializer
"""
def start_message(*globals):
logger.debug("Starting %s" % mp_context.current_process().name)
def callback(result):
logger.info("\n Feierabend! Done with the work!")
if nprocs is None:
nprocs = mp_context.cpu_count()
if chunksize is None:
chunksize = 1
if nprocs == 1:
for work in workpackage:
if initializer is not None:
initializer(*initargs)
yield [function(*work)]
else:
pool = mp_context.Pool(
processes=nprocs, initializer=initializer, initargs=initargs
)
logger.debug("Worker timeout after %i second(s)" % timeout)
workers = [
WatchedWorker(
function,
work,
initializer=worker_initializer,
initargs=winitargs,
timeout=timeout,
)
for work in workpackage
]
pool_timeout = int(len(workpackage) / 3.0 * timeout / nprocs)
if pool_timeout < 100:
pool_timeout = 100
logger.debug("Overseer timeout after %i second(s)" % pool_timeout)
logger.debug("Chunksize: %i" % chunksize)
try:
yield pool.map_async(
_pay_worker, workers, chunksize=chunksize, callback=callback
).get(pool_timeout)
except mp_context.TimeoutError:
logger.error("Overseer fell asleep. Fire everyone!")
pool.terminate()
except KeyboardInterrupt:
logger.error("Got Ctrl + C")
traceback.print_exc()
pool.terminate()
else:
pool.close()
pool.join()
# reset process counter for tqdm progressbar
mp_context.process._process_counter = count(1)
[docs]
def memshare(parameternames):
"""
Add parameters to set of variables that are to be put into shared
memory.
Parameters
----------
parameternames : list of str
off names to :class:`pytensor.tensor.sharedvar.TensorSharedVariable`
"""
for paramname in parameternames:
if not isinstance(paramname, str):
raise ValueError(
'Parameter cannot be memshared! Invalid name! "%s" '
'Has to be of type "string"' % paramname
)
_tobememshared.update(parameternames)
[docs]
def memshare_sparams(shared_params):
"""
For each parameter in a list of pytensor TensorSharedVariable
we substitute the memory with a sharedctype using the
multiprocessing library.
The wrapped memory can then be used by other child processes
thereby synchronising different instances of a model across
processes (e.g. for multi cpu gradient descent using single cpu
pytensor code).
Parameters
----------
shared_params : list
of :class:`pytensor.tensor.sharedvar.TensorSharedVariable`
Returns
-------
memshared_instances : list
of :class:`multiprocessing.sharedctypes.RawArray`
list of sharedctypes (shared memory arrays) that point
to the memory used by the current process's pytensor variable.
Notes
-----
Modified from:
https://github.com/JonathanRaiman/pytensor_lstm/blob/master/pytensor_lstm/shared_memory.py
# define some pytensor function:
myfunction = myfunction(20, 50, etc...)
# wrap the memory of the pytensor variables:
memshared_instances = make_params_shared(myfunction.get_shared())
Then you can use this memory in child processes
(See usage of `borrow_memory`)
"""
for param in shared_params:
original = param.get_value(True, True)
size = original.size
shape = original.shape
original.shape = size
logger.info("Allocating %s" % param.name)
ctypes = mp_context.RawArray(
"f" if original.dtype == num.float32 else "d", size
)
ctypes_numarr = num.ctypeslib.as_array(ctypes)
ctypes_numarr[:] = original
# remove large object from Shared to get through pickle size limitation
param.set_value(num.empty([1 for i in range(len(shape))]), borrow=True)
_shared_memory[param.name] = (ctypes_numarr, shape)
[docs]
def borrow_memory(shared_param, memshared_instance, shape):
"""
Spawn different processes with the shared memory
of your pytensor model's variables.
Parameters
----------
shared_param : :class:`pytensor.tensor.sharedvar.TensorSharedVariable`
the pytensor shared variable where
shared memory should be used instead.
memshared_instance : :class:`multiprocessing.RawArray`
the memory shared across processes (e.g.from `memshare_sparams`)
shape : tuple
of shape of shared instance
Notes
-----
Modiefied from:
https://github.com/JonathanRaiman/pytensor_lstm/blob/master/pytensor_lstm/shared_memory.py
For each process in the target function run the pytensor_borrow_memory
method on the parameters you want to have share memory across processes.
In this example we have a model called "mymodel" with parameters stored in
a list called "params". We loop through each pytensor shared variable and
call `borrow_memory` on it to share memory across processes.
Examples
--------
>>> def spawn_model(path, wrapped_params):
# prevent recompilation and arbitrary locks
>>> pytensor.config.reoptimize_unpickled_function = False
>>> pytensor.gof.compilelock.set_lock_status(False)
# load your function from its pickled instance (from path)
>>> myfunction = MyFunction.load(path)
# for each parameter in your function
# apply the borrow memory strategy to replace
# the internal parameter's memory with the
# across-process memory:
>>> for param, memshared_instance in zip(
>>> myfunction.get_shared(), memshared_instances):
>>> borrow_memory(param, memory)
# acquire your dataset (either through some smart shared memory
# or by reloading it for each process)
# dataset, dataset_labels = acquire_dataset()
# then run your model forward in this process
>>> epochs = 20
>>> for epoch in range(epochs):
>>> model.update_fun(dataset, dataset_labels)
See `borrow_all_memories` for list usage.
"""
logger.debug("%s" % shared_param.name)
param_value = num.frombuffer(memshared_instance).reshape(shape)
shared_param.set_value(param_value, borrow=True)
[docs]
def borrow_all_memories(shared_params, memshared_instances):
"""
Run pytensor_borrow_memory on a list of params and shared memory
sharedctypes.
Parameters
----------
shared_params : list
of :class:`pytensor.tensor.sharedvar.TensorSharedVariable`
the pytensor shared variable where
shared memory should be used instead.
memshared_instances : dict of tuples
of :class:`multiprocessing.RawArray` and their shapes
the memory shared across processes (e.g.from `memshare_sparams`)
Notes
-----
Same as `borrow_memory` but for lists of shared memories and
pytensor variables. See `borrow_memory`
"""
for sparam in shared_params:
borrow_memory(sparam, *memshared_instances[sparam.name])