julio 28, 2021
~ 4 MIN
Python Mutliprocessing
< Blog RSSPython Multprocessing
En este post vamos a aprender a ejectuar nuestro código en Python
uando todos los cores
de nuestra CPU
. De esta manera, tareas que normalmente ejecutaríamos de manera secuencial y podrían llevar mucho tiempo las podremos paralelizar consiguiendo una gran mejora.
Empezaremos ilustrando la gran potencia de este método en un ejemplo sencillo que consiste en leer un conjunto de imágenes de satélite de una carpeta, convertirlas a PNG
usando las bandas RGB
y guardar el resultado en otra carpeta. Para ello usaremos las imágenes del dataset EuroSAT, usado en los posts anteriores.
import glob
path = './data'
images = glob.glob(f'{path}/*/*.tif')
len(images)
27000
import skimage.io as io
import numpy as np
def read_ms(img):
ms = io.imread(img)
# las imágenes originales tienen 13 bandas
assert ms.shape[2] == 13
return ms
def get_rgb(ms):
# nos quedamos con 3 bandas (RGB) y normalizamos
return (255 * (ms[...,(3,2,1)] / 4000).clip(0,1)).astype(np.uint8)
dest_folder = f'{path}/results'
def save_png(name, img, sep="\\"):
img_name = name.split(sep)[-1][:-4]
file_path = f'{dest_folder}/{img_name}.png'
io.imsave(file_path, img)
return file_path
from tqdm import tqdm
import warnings
warnings.simplefilter("ignore") # se queja que hay imágenes con bajo contraste
De la siguiente manera llevamos a cabo nuestro procesado simple de manera secuenciual.
for img in tqdm(images):
ms = read_ms(img)
rgb = get_rgb(ms)
save_png(img, rgb)
100%|██████████| 27000/27000 [01:12<00:00, 370.25it/s]
Como puedes ver llevar a cabo esta tarea tarda un poco más de un minuto, y eso que las imágenes son relativamente pequeñas y pocas. Vamos a acelerar el procesado !
El módulo concurrent.futures
En el módulo concurrent futures
encontramos la funcionalidad que ofrece Python
para el procesado en paralelo. Básicamente tenemos dos alternativas: usar el objeto ThreadPoolExecutor
o ProcessPoolExecutor
. En el primer caso, Python
intentará ejectuar nuestro código en diferentes threads
mientras que en el segundo usará los cores
físicos de nuestra CPU
. En función de la aplicación, una alternativa puede ser más ventajosa sobre la otra dependiendo de si el cuello de botella se encuentra en el procesado en sí o en el I/O
.
def generate_rgb(img):
ms = read_ms(img)
rgb = get_rgb(ms)
save_png(img, rgb)
El siguiente código es capaz de llevar a cabo el mismo procesado pero de forma paralela. Pudes cambiar el Executor
para comparar las diferentes alternativas.
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
executor.map(generate_rgb, images)
El mismo procesado ahora ha terminado en 30 segundos, más de la mitad ! En algunos casos puede ser interesante añadir también una barra de progreso.
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
num_cores = multiprocessing.cpu_count()
with ThreadPoolExecutor(max_workers=num_cores) as pool:
with tqdm(total=len(images)) as progress:
futures = []
for img in images:
future = pool.submit(generate_rgb, img)
future.add_done_callback(lambda p: progress.update())
futures.append(future)
for future in futures:
future.result()
100%|██████████| 27000/27000 [00:31<00:00, 861.86it/s]
Es posbile recuperar los resultados devueltos por nuestra función de la siguiente manera.
def generate_rgb2(img):
ms = read_ms(img)
rgb = get_rgb(ms)
# ahora la función devuelve el path de la nueva imágen creada
return save_png(img, rgb)
with ThreadPoolExecutor(max_workers=num_cores) as pool:
with tqdm(total=len(images)) as progress:
futures = []
for img in images:
future = pool.submit(generate_rgb2, img)
future.add_done_callback(lambda p: progress.update())
futures.append(future)
# guardamos los resultados
results = []
for future in futures:
result = future.result()
results.append(result)
100%|██████████| 27000/27000 [00:31<00:00, 867.57it/s]
len(results)
27000
results[:3]
[('./data/results/AnnualCrop_1.png', 'hola'),
('./data/results/AnnualCrop_10.png', 'hola'),
('./data/results/AnnualCrop_100.png', 'hola')]
Y por último vamos a ver un ejemplo de como podemos enviar varios argumentos a nuestra función. Para ello crearemos una lista en la que cada elemento será una tupla con todos los argumentos necesarios para llevar a cabo la función. En el cuerpo de la función, recuperamos los argumentos individuales de la tupla como puedes ver a continuación.
def generate_rgb3(args):
img, a, b, c = args # sacamos los argumentos de la tupla
ms = read_ms(img)
rgb = get_rgb(ms)
return save_png(img, rgb)
args = [(img, 1, 2, 3) for img in images] # lista de tuplas con argumentos
with ThreadPoolExecutor(max_workers=num_cores) as pool:
with tqdm(total=len(images)) as progress:
futures = []
for arg in args:
future = pool.submit(generate_rgb3, arg) # enviamos la tupla de argumentos
future.add_done_callback(lambda p: progress.update())
futures.append(future)
results = []
for future in futures:
result = future.result()
results.append(result)
100%|██████████| 27000/27000 [00:31<00:00, 867.57it/s]
Resumen
En este post hemos aprendido a usar las herramientas disponibles en Python
para acelerar nuestro código ejecutando tareas en paralelo. La próxima vez que te encuentres llevando a cabo operaciones de manera secuencial y ésto te lleve mucho tiempo, si es posible considera paralelizar las tareas para sacar el máximo partido a tu CPU
. Casos de uso ideales son el procesado imágenes, comprimir o descomprimir archivos, etc.