Processifying Bulky Functions in Python

Long running Python scripts that repeatedly call “bulky” functions can incur growing memory usage over time. Other than calling gc.collect() and pray hard, can we do a bit better in dealing with the heavy memory footprint?

The problem with bulky functions

Some real examples I had faced include scripts that evaluate performance of many different combinations of features and model on a dataset:

def evaluate(features, model):
    # Load some big features
    # Train a big model
    # Measure the performance
    # After all the bulky work, return a number or a small dict, etc.
    return score

In a single Python script, repeatedly calling the above bulky function can cause the memory usage to grow over time, even though the calls are really independent of each other. That is to say, some amount of lingering memory will accumulate after every call to the function, as if there is an on-going memory leak. This continues until one has to restart the script to trigger a “refresh” to free up the memory.

Possible solutions

0. Find out where the memory leak is with a profiler

This is probably the only real solution to the problem. However, the effort needed might be too much for exploratory code that will not go into production. Sometimes, something more pragmatic is needed.

1. gc.collect()

This attempts to force the garbage collector to do its work. Often preceded by some del ... statements. Unfortunately, there is no guarantee that the memory will be released.

2. Invoke each execution of evaluate() in a new process, e.g. with a “driver” shell script

Now we are getting somewhere. The OS will be forced to clean up the memory incurred by the function, when the process terminates. However, this requires a lot of extra boilerplate code to be written – a command line interface and a shell script. Besides the annoying context switching of the mind between languages, more importantly this approach is only feasible for simple “end-user” function such as the one shown in this example. Generally, for a bulky function that needs to be called periodically within another function, we need to stay within the Python interpreter.

3. Spawn a new process in Python that will execute the function

Indeed, we can spawn a new process to execute the function using the multiprocessing module. This requires modification to the evaluate() function so that it takes in a Queue object for communicating back the result. Perhaps more preferably, we can make use of the Pool interface to help with the result communication:

from multiprocessing import Pool
pool = Pool(processes=1)
result = pool.apply(evaluate, features, model)

Looks acceptable. However, what if an exception is raised somewhere in the function? The error traceback now becomes

/usr/lib64/python2.7/multiprocessing/pool.pyc in apply(self, func, args, kwds)
    242         '''
    243         assert self._state == RUN
--> 244         return self.apply_async(func, args, kwds).get()
    246     def map(self, func, iterable, chunksize=None):

/usr/lib64/python2.7/multiprocessing/pool.pyc in get(self, timeout)
    565             return self._value
    566         else:
--> 567             raise self._value
    569     def _set(self, i, obj):

ValueError: operands could not be broadcast together with shapes (10,2) (10,20)

regardless of where the exception is actually raised. As one can guess, staring at the source code of the multiprocessing module is not particularly informative for debugging an error in the user level code1. Some further hacking is necessary to float the actual context up to the user on exception. Also, having to interact with the Pool object every time is kind of distracting. In other words, this solution is close but not yet ideal.

The processify decorator

Recently, I found a neat piece of code in the wild that solves this problem – the processify decorator. Full credit goes to schlamar who posted it on GitHub Gist. For completeness, I will show the (slightly modified) code here:

import os
import sys
import traceback
from functools import wraps
from multiprocessing import Process, Queue

def processify(func):
    '''Decorator to run a function as a process.
    Be sure that every argument and the return value
    is *pickable*.
    The created process is joined, so the code does not
    run in parallel.

    def process_func(q, *args, **kwargs):
            ret = func(*args, **kwargs)
        except Exception:
            ex_type, ex_value, tb = sys.exc_info()
            error = ex_type, ex_value, ''.join(traceback.format_tb(tb))
            ret = None
            error = None

        q.put((ret, error))

    # register original function with different name
    # in sys.modules so it is pickable
    process_func.__name__ = func.__name__ + 'processify_func'
    setattr(sys.modules[__name__], process_func.__name__, process_func)

    def wrapper(*args, **kwargs):
        q = Queue()
        p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs)
        ret, error = q.get()

        if error:
            ex_type, ex_value, tb_str = error
            message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str)
            raise ex_type(message)

        return ret
    return wrapper

Note: In the original version, the usage of p.join() causes a risk of deadlock situation if the object returned from the target function is too large. The above code has been modified and it seems to have solved the problem. More information can be found in the comment section of the Gist.

To use it, simply decorate the bulky function:

def evaluate(features, model):
    # Load some big features
    # Train a big model
    # Measure the performance
    # After all the bulky work, return a number or a small dict, etc.
    return score

and the function will be run cleanly in a new process, as long as the input and output of the function are picklable.

Very nice! What I like about it is:

  1. Very easy to use, no modification to the original function required (in fact, this is probably a perfect use case of a decorator)
  2. A trick that seems to successfully pickle nested function
  3. Able to show some useful context on exception as opposed to the case above using Pool.

Regarding point (3) above, consider the following code:

def work():
    """Get things done here"""
    import numpy as np
    np.random.rand(10,2) + np.random.rand(10,20)
    return np.random.rand(10,2)

if __name__ == '__main__':

The error traceback is shown as:

ValueError                                Traceback (most recent call last)
<ipython-input-3-c2d43235dcae> in <module>()
     67 if __name__ == '__main__':
---> 68     work()

<ipython-input-3-c2d43235dcae> in wrapper(*args, **kwargs)
     43             ex_type, ex_value, tb_str = error
     44             message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str)
---> 45             raise ex_type(message)
     47         return ret

ValueError: operands could not be broadcast together with shapes (10,2) (10,20)  (in subprocess)
  File "<ipython-input-3-c2d43235dcae>", line 18, in process_func
    ret = func(*args, **kwargs)
  File "<ipython-input-3-c2d43235dcae>", line 55, in work
    np.random.rand(10,2) + np.random.rand(10,20)

which is quite helpful.

As long as a bulky function is repeatedly called in a coarse-grained manner (i.e. time spent in a single execution of function is dominant in the caller’s loop) and has lightweight input and output, the processifying trick is a sensible technique that can be used to control the memory usage.

I’ve added this to my standard utility toolbox for quick data analytics projects, and it just saved me a time or two in a recent competition which I’ll probably write about soon.

No more gc.collect() blindly!

  1. This issue seems to have been fixed in Python 3.4: see the bug tracker