Processifying Bulky Functions in Python
Long running Python scripts that repeatedly call “bulky” functions can incur growing memory usage over time. Other than calling
gc.collect() and pray hard, can we do a bit better in dealing with the heavy memory footprint?
The problem with bulky functions
Some real examples I had faced include scripts that evaluate performance of many different combinations of features and model on a dataset:
def evaluate(features, model): # Load some big features # Train a big model # Measure the performance # After all the bulky work, return a number or a small dict, etc. return score
In a single Python script, repeatedly calling the above bulky function can cause the memory usage to grow over time, even though the calls are really independent of each other. That is to say, some amount of lingering memory will accumulate after every call to the function, as if there is an on-going memory leak. This continues until one has to restart the script to trigger a “refresh” to free up the memory.
0. Find out where the memory leak is with a profiler
This is probably the only real solution to the problem. However, the effort needed might be too much for exploratory code that will not go into production. Sometimes, something more pragmatic is needed.
This attempts to force the garbage collector to do its work. Often preceded by some
del ... statements. Unfortunately, there is no guarantee that the memory will be released.
2. Invoke each execution of
evaluate() in a new process, e.g. with a “driver” shell script
Now we are getting somewhere. The OS will be forced to clean up the memory incurred by the function, when the process terminates. However, this requires a lot of extra boilerplate code to be written – a command line interface and a shell script. Besides the annoying context switching of the mind between languages, more importantly this approach is only feasible for simple “end-user” function such as the one shown in this example. Generally, for a bulky function that needs to be called periodically within another function, we need to stay within the Python interpreter.
3. Spawn a new process in Python that will execute the function
Indeed, we can spawn a new process to execute the function using the
multiprocessing module. This requires modification to the
evaluate() function so that it takes in a
Queue object for communicating back the result. Perhaps more preferably, we can make use of the
Pool interface to help with the result communication:
from multiprocessing import Pool pool = Pool(processes=1) result = pool.apply(evaluate, features, model)
Looks acceptable. However, what if an exception is raised somewhere in the function? The error traceback now becomes
/usr/lib64/python2.7/multiprocessing/pool.pyc in apply(self, func, args, kwds) 242 ''' 243 assert self._state == RUN --> 244 return self.apply_async(func, args, kwds).get() 245 246 def map(self, func, iterable, chunksize=None): /usr/lib64/python2.7/multiprocessing/pool.pyc in get(self, timeout) 565 return self._value 566 else: --> 567 raise self._value 568 569 def _set(self, i, obj): ValueError: operands could not be broadcast together with shapes (10,2) (10,20)
regardless of where the exception is actually raised. As one can guess, staring at the source code of the multiprocessing module is not particularly informative for debugging an error in the user level code1. Some further hacking is necessary to float the actual context up to the user on exception. Also, having to interact with the
Pool object every time is kind of distracting. In other words, this solution is close but not yet ideal.
Recently, I found a neat piece of code in the wild that solves this problem – the
processify decorator. Full credit goes to schlamar who posted it on GitHub Gist. For completeness, I will show the (slightly modified) code here:
import os import sys import traceback from functools import wraps from multiprocessing import Process, Queue def processify(func): '''Decorator to run a function as a process. Be sure that every argument and the return value is *pickable*. The created process is joined, so the code does not run in parallel. ''' def process_func(q, *args, **kwargs): try: ret = func(*args, **kwargs) except Exception: ex_type, ex_value, tb = sys.exc_info() error = ex_type, ex_value, ''.join(traceback.format_tb(tb)) ret = None else: error = None q.put((ret, error)) # register original function with different name # in sys.modules so it is pickable process_func.__name__ = func.__name__ + 'processify_func' setattr(sys.modules[__name__], process_func.__name__, process_func) @wraps(func) def wrapper(*args, **kwargs): q = Queue() p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs) p.start() ret, error = q.get() if error: ex_type, ex_value, tb_str = error message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str) raise ex_type(message) return ret return wrapper
Note: In the original version, the usage of
p.join() causes a risk of deadlock situation if the object returned from the target function is too large. The above code has been modified and it seems to have solved the problem. More information can be found in the comment section of the Gist.
To use it, simply decorate the bulky function:
@processify def evaluate(features, model): # Load some big features # Train a big model # Measure the performance # After all the bulky work, return a number or a small dict, etc. return score
and the function will be run cleanly in a new process, as long as the input and output of the function are picklable.
Very nice! What I like about it is:
- Very easy to use, no modification to the original function required (in fact, this is probably a perfect use case of a decorator)
- A trick that seems to successfully pickle nested function
- Able to show some useful context on exception as opposed to the case above using
Regarding point (3) above, consider the following code:
@processify def work(): """Get things done here""" import numpy as np np.random.rand(10,2) + np.random.rand(10,20) return np.random.rand(10,2) if __name__ == '__main__': work()
The error traceback is shown as:
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) <ipython-input-3-c2d43235dcae> in <module>() 66 67 if __name__ == '__main__': ---> 68 work() <ipython-input-3-c2d43235dcae> in wrapper(*args, **kwargs) 43 ex_type, ex_value, tb_str = error 44 message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str) ---> 45 raise ex_type(message) 46 47 return ret ValueError: operands could not be broadcast together with shapes (10,2) (10,20) (in subprocess) File "<ipython-input-3-c2d43235dcae>", line 18, in process_func ret = func(*args, **kwargs) File "<ipython-input-3-c2d43235dcae>", line 55, in work np.random.rand(10,2) + np.random.rand(10,20)
which is quite helpful.
As long as a bulky function is repeatedly called in a coarse-grained manner (i.e. time spent in a single execution of function is dominant in the caller’s loop) and has lightweight input and output, the processifying trick is a sensible technique that can be used to control the memory usage.
I’ve added this to my standard utility toolbox for quick data analytics projects, and it just saved me a time or two in a recent competition which I’ll probably write about soon.