Mejorando el rendimiento en Python
Todo el código fuente de este post está disponible en github.
Tengo predilección por Python. Para mí, es un gran lenguaje para crear prototipos en muchas áreas. Para mi trabajo de investigación, suelo crear/diseñar algoritmos de optimización continua utilizando algoritmos evolutivos. Para estos algoritmos, lenguajes como C/C++ o Java son muy utilizados, especialmente por su buen rendimiento (para publicar, es habitual tener que hacer muchas comparaciones entre algoritmos, por lo que el rendimiento puede ser crítico. Sin embargo, para probar nuevas ideas, muchos autores utilizan otras herramientas como Mathlab que reduce el tiempo de desarrollo a costa de un mayor tiempo de computación.
Estoy de acuerdo en que Mathlab es genial para los algoritmos numéricos, pero sigo prefiriendo Python sobre Mathlab, porque me siento más cómodo con él, y tengo muchas bibliotecas, y es más sencillo llamar a código en otros lenguajes, escrito en C o Java. Eso nos permite aumentar el rendimiento, y me gusta probar cuánto se puede mejorar.
Hace varios meses, empecé a escribir mi algoritmo más exitoso, Algoritmos Meméticos basados en el Encadenamiento LS, en Python. Tenía varias dudas sobre el rendimiento, así que empiezo a escribir un elemento, un Algoritmo Genético de Estado Estable, en Python.
Llamando a código C/C++ desde python
El primer reto que tuve que afrontar fue permitir que mi programa en python utilizara las mismas funciones de referencia que otras implementaciones, CEC'2005 benchmark. Este benchmark define las funciones a optimizar, por lo que su función principal es evaluar mis soluciones, cuando cada solución es un vector de números reales, con un valor real de fitness. El código del benchmark fue implementado (por sus autores) en C/C++. Por lo tanto, mi código python tiene que llamar al código C++.
Para ello, he utilizado la librería boost::python, que es, en mi opinión, la forma más sencilla de llamar al código C/C++, especialmente cuando utilizamos el paquete numpy.
En mi caso, es muy sencillo, porque necesito pocas funciones:
#include <boost/python.hpp>
#include <boost/python/numeric.hpp>
#include <boost/python/list.hpp>
#include <iostream>
#include "cec2005/cec2005.h"
#include "cec2005/srandom.h"
using namespace boost::python;
Random r(new SRandom(12345679));
void set_function(int fun, int dim) {
init_cec2005(&r, fun, dim);
}
double evalua(const numeric::array &el) {
const tuple &shape = extract<tuple>(el.attr("shape"));
unsigned n = boost::python::extract<unsigned>(shape[0]);
double *tmp = new double[n];
for(unsigned int i = 0; i < n; i++)
{
tmp[i] = boost::python::extract<double>(el[i]);
}
double result = eval_cec2005(tmp, n);
delete tmp;
return result;
}
...
BOOST_PYTHON_MODULE(libpycec2005)
{
using namespace boost::python;
numeric::array::set_module_and_type( "numpy", "ndarray");
def("config", &set_function);
def("evaluate", &evalua);
...
}
Más información en la buena documentación de boost::python.
Uno puede llamar al código C/C++, hemos implementado el algoritmo en código python. El código de prueba fue el siguiente:
from ssga import SSGA
from readargs import ArgsCEC05
import libpycec2005 as cec2005
import numpy
def check_dimension(option, opt, value):
if value not in [2, 10, 30, 50]:
raise OptionValueError(
"option %s: invalid dimensionality value: %r" % (opt, value))
def main():
"""
Main program
"""
args = ArgsCEC05()
if args.hasError:
args.print_help_exit()
fun = args.function
dim = args.dimension
print "Function: %d" %fun
print "Dimension: %d" %dim
cec2005.config(fun, dim)
domain = cec2005.domain(fun)
print "Domain: ", domain
ea = SSGA(domain=domain, size=60, dim=dim, fitness=cec2005.evaluate)
for x in xrange(25):
ea.run(maxeval=dim*10000)
[bestsol, bestfit] = ea.getBest()
print "BestSol: ", bestsol
print "BestFitness: %e" %bestfit
ea.reset()
if __name__ == "__main__":
main()
Este código fuente ejecuta el algoritmo 25 veces, y en cada ejecución el algoritmo se detiene cuando se crean 10000*dim soluciones. Estas condiciones están indicadas en la especificación del benchmark. El único parámetro fue la función (-f, se utilizó la función 1 por por defecto) y la dimensión (-d) de 10, 30, 50.
Perfilando el tiempo de computación
¿Cuánto tiempo tarda? He cambiado xrange(25) por xrange(1) y hemos ejecutado su versión actual. El tiempo final fue de 7 minutos para la dimensión 10, y 21 minutos para la dimensión 30 (para una sola función).
Como me gusta hacer cosas más interesantes, que sólo esperar el tiempo de computación, yo uso el perfil, sólo una ejecución para la función, para detectar las funciones/método más costosos en tiempo de computación.
python -m cProfile runcec.py -f 1 -d 10
The output was the following:
2943600 function calls (2943531 primitive calls) in 31.031 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
....
1 0.001 0.001 0.126 0.126 ssga.py:1(<module>)
99940 0.561 0.000 17.463 0.000 ssga.py:109(cross)
1 0.000 0.000 0.000 0.000 ssga.py:123(reset)
1 5.559 5.559 51.129 51.129 ssga.py:126(run)
1 0.000 0.000 0.000 0.000 ssga.py:14(__init__)
1 0.000 0.000 0.000 0.000 ssga.py:158(getBest)
1 0.000 0.000 0.000 0.000 ssga.py:31(set_mutation_rate)
99940 0.730 0.000 1.885 0.000 ssga.py:45(mutation)
12438 0.286 0.000 0.758 0.000 ssga.py:50(mutationBGA)
1 0.002 0.002 0.002 0.002 ssga.py:77(initPopulation)
105883 1.101 0.000 5.604 0.000 ssga.py:89(updateWorst)
1 0.000 0.000 0.000 0.000 ssga.py:9(SSGA)
99940 1.049 0.000 20.617 0.000 ssga.py:97(getParents)
...
Con el perfil podemos observar los métodos más costosos de nuestro código: getParents (20 segundos), operador de cruce (17 segundos) y updateWorst (5 segundos). Estos métodos suponen el 85% del tiempo de computación, y los dos primeros el 74% del tiempo de cálculo.
Optimización del código
Eso demuestra que la mayor parte del tiempo de computación se debe a una minoría del código, sólo tres métodos. Si podemos optimizar estos métodos, nuestro código podría mejorado mucho.
Podemos utilizar de nuevo el paquete boost::python, pero es un poco tedioso utilizarlo. Por lo tanto, hemos utilizado el paquete cython. Con cython podemos optimizar el código fuente añadiendo información sobre los tipos.
Por ejemplo, en lugar del siguiente código
import numpy as np
def distance(ind1,ind2):
"""
Euclidean distance
ind1 -- first array to compare
ind2 -- second array to compare
Return euclidean distance between the individuals
>>> from numpy.random import rand
>>> import numpy as np
>>> dim = 30
>>> sol = rand(dim)
>>> distance(sol,sol)
0.0
>>> ref=np.zeros(dim)
>>> dist=distance(sol,ref)
>>> dist > 0
True
>>> dist2 = distance(sol*2,ref)
>>> 2*dist == dist2
True
"""
dif = ind1-ind2
sum = (dif*dif).sum()
return math.sqrt(sum)
Podemos escribir:
cimport numpy as np
cimport cython
DTYPE = np.double
ctypedef np.double_t DTYPE_t
ctypedef np.int_t BTYPE_t
def distance(np.ndarray[DTYPE_t, ndim=1]ind1, np.ndarray[DTYPE_t, ndim=1] ind2):
"""
Euclidean distance
ind1 -- first array to compare
ind2 -- second array to compare
....
"""
cdef np.ndarray[DTYPE_t, ndim=1] dif = ind1-ind2
cdef double sum = (dif*dif).sum()
return math.sqrt(sum)
Podemos ver que sigue siendo muy legible. sólo hemos puesto información sobre el tipo y dimensión en los parámetros del vector y sobre las variables, usando la palabra clave cdef.
Veamos como ejemplo el primer método, el operador de cruce, implementado en el método crossBLX:
import numpy as np
import math
def crossBLX(mother,parent,domain,alpha):
"""
crossover operator BLX-alpha
mother -- mother (first individual)
parent -- parent (second individual)
domain -- domain to check
alpha -- parameter alpha
Returns the new children following the expression children = random(x-alpha*dif, y+alpha*dif),
where dif=abs(x,y) and x=lower(mother,parents), y=upper(mother,parents)
>>> import numpy as np
>>> low=-5
>>> upper = 5
>>> dim=30
>>> sol = np.array([1,2,3,2,1])
>>> crossBLX(sol,sol,[low,upper],0)
array([ 1., 2., 3., 2., 1.])
"""
diff = abs(mother-parent)
dim = mother.size
I=diff*alpha
points = np.array([mother,parent])
A=np.amin(points,axis=0)-I
B=np.amax(points,axis=0)+I
children = np.random.uniform(A,B,dim)
[low,high]=domain
return np.clip(children, low, high)
Podemos ver que es muy sencillo de implementar usando numpy, pero sigue siendo muy lento. Con cython he definido implementar directamente las numerosas operaciones, el siguiente código:
def crossBLX(np.ndarray[DTYPE_t, ndim=1] mother,np.ndarray[DTYPE_t, ndim=1] parent,list domain, double alpha):
"""
...
"""
cdef np.ndarray[DTYPE_t, ndim=1] C, r
cdef int low, high, dim
cdef double x, y
cdef double I, A, B
cdef unsigned i
[low,high]=domain
dim = mother.shape[0]
C = np.zeros(dim)
r = random.randreal(0,1,dim)
for i in range(dim):
if mother[i] < parent[i]:
(x,y) = (mother[i],parent[i])
else:
(y,x) = (mother[i],parent[i])
I = alpha*(y-x)
A=x-I
B=y+I
if (A < low):
A = low
if (B > high):
B = high
C[i] = A+r[i]*(B-A)
return C
Es cierto que el código fuente es más complicado, pero sigue siendo muy legible. He comparado las dos implementaciones para asegurarme de que ambas devuelven los mismos valores.
Midiendo la mejora
¿Cuánto cuestan estos pequeños cambios en el código? He vuelto a perfilar el código fuente y me da:
1020045 function calls (1019976 primitive calls) in 18.003 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
....
1 0.001 0.001 0.127 0.127 ssga.py:1(<module>)
99940 0.425 0.000 2.432 0.000 ssga.py:109(cross)
1 0.000 0.000 0.000 0.000 ssga.py:123(reset)
1 5.415 5.415 17.864 17.864 ssga.py:126(run)
1 0.000 0.000 0.000 0.000 ssga.py:14(__init__)
1 0.000 0.000 0.000 0.000 ssga.py:158(getBest)
1 0.000 0.000 0.000 0.000 ssga.py:31(set_mutation_rate)
99940 0.699 0.000 2.006 0.000 ssga.py:45(mutation)
12544 0.338 0.000 0.929 0.000 ssga.py:50(mutationBGA)
1 0.002 0.002 0.002 0.002 ssga.py:77(initPopulation)
105959 0.775 0.000 1.343 0.000 ssga.py:89(updateWorst)
1 0.000 0.000 0.000 0.000 ssga.py:9(SSGA)
99940 0.940 0.000 6.665 0.000 ssga.py:97(getParents)
....
Podemos ver la mejora obtenida.
Method | Python | Cython |
---|---|---|
cross : | 17.4 | 2.4 |
getParents : | 20.6 | 6.6 |
updateWorst : | 5.6 | 1.3 |
Total | 43.6 | 10.3 |
El nuevo código tarda sólo un 23% del tiempo de cálculo del código anterior. Con estos cambios, hemos reducido el tiempo total de 51 segundos a 18 de código.
En perspectiva
Ahora, ejecuto el código fuente sin el perfil, y pruebo el código fuente obteniendo el siguiente tiempo:
Method | dim=10 | dim=30 | dim=50 |
---|---|---|---|
Python | 44s | 3240s (54m) | -- |
Cython | 10s | 28s | 48s |
Improvement | 77% | 99% | --- |
En la siguiente tabla, comprobamos el tiempo máximo para una y 25 ejecuciones (el tiempo depende de la función utilizada).
#functions | dim=10 | dim=30 | dim=50 |
---|---|---|---|
1 | 10s/18s | 28s/40s | 48s/1m |
25 | 3/7m | 15/21m | 38m/ |
Así, el tiempo total de cálculo es de 7 minutos para la dimensión 10, y de 21 minutos para la dimensión 30. Estas cifras son muy aceptables, especialmente porque podemos probar en paralelo las diferentes funciones en un cluster de ordenadores. Desgraciadamente, una implementación en Mathlab no sólo llevaría más tiempo, sino que sino que además, por razones de licencia, no podría ejecutarse en paralelo sin límite.
En resumen, podemos utilizar código python, no sólo para crear prototipos experimentales, sino también para crear prototipos funcionales.
Y sobre el posible problema de las pruebas, he estado trabajando en ello, pero creo que es suficiente para un post, ¿no? :-)
Todo el código referido en el post, tanto en python como en cython, está disponible en github, por si quieres verlo.