How to use multiprocessing for a big 3d image stack? python

I have a 3d image stack(4000×2048×2048),I want to do some operation in every sigle 2d array(2048×2048),eg. Gaussian filtering, image enhancement,resize img …

import numpy as np
from tifffile import imread,imwrite
import multiprocessing as mp
import cv2

def gaussian_blur_2d(img):
    blur = cv2.GaussianBlur(img,(5,5),0) 
    return blur

file_path = "F:\Ctest\123.tif"
img = imread(file_path)
for i in range(0,img.shape[0]):
    img[i,:,:] = gaussian_blur_2d(img[i,:,:])


How can I accelerate the for loop by using multiprocessing? My idea is two split the raw image stack into four or eight parts,and using pool.map to the split stack.But how can I using the split processing result to get a final full stack.I don not want to write the split stacks. This will increase extra IO time. When the split stacks too big, it would cause return error in pool.map in my experience.

On the other hand, I’ve tried just sticking the multidimensional array into mp.Array which gives me TypeError: only size-1 arrays can be converted to Python scalars.

Answer

As I mentioned in the comments, getting all that data to be accessible between several worker processes is the biggest challenge here, because one of the key tenants of using multiprocessing is that generally no memory is shared between processes. Therefore we must explicitly tell the operating system that we want access to a chunk of memory that “is” shared between the processes, and create our numpy array with that chunk of memory. Beyond that it’s just a little multiprocessing housekeeping which is pretty standard and well explored in other tutorials and examples.

import numpy as np
from multiprocessing import Process, shared_memory, Queue, cpu_count
from queue import Empty
import cv2

class STOPFLAG: pass #a simple flag to tell the worker to stop

def worker_process(in_q, shm_name):
    shm = shared_memory.SharedMemory(name=shm_name) #create from the existing one made by the parent process
    img_stack = np.ndarray([4000, 2048, 2048], dtype="uint8", buffer=shm.buf) #attach a numpy array to the memory object
    while True: #until the worker runs out of work
        try:
            task = in_q.get(1) #don't wait forever on anything if you can help it.
        except Empty: #multiprocessing.Queue uses an exception template from the queue library
            print("assuming all tasks are done. worker exiting...") #assume waiting for a while means no more tasks (we shouldn't hit this, but it could prevent problems in the child if a crash happens elsewhere)
            break
        if isinstance(task, STOPFLAG):
            print("got stop flag. worker exiting...")
            break
        
        #process the image slice (no mutexes are needed because no two workers will ever get the same index to work on at the same time)
        img_stack[task] = cv2.GaussianBlur(img_stack[task],(5,5),0) 
        
    shm.close() #cleanup after yourself (close the local copy. This does not close the copy in the other processes)

if __name__ == "__main__": #this is needed with multiprocessing

    #create shared memory space where numpy will work from
    shm = shared_memory.SharedMemory(create=True, size=4000*2048*2048) #OS may have a hard time allocating this memory block because it's so big...
    #create the numpy array from the allocated memory
    img_stack = np.ndarray([4000, 2048, 2048], dtype="uint8", buffer=shm.buf)
    
    #Here is where you would load the image data onto the img_stack array. It will start out with whatever random data was previously in ram similar to numpy.empty.
    
    #create a queue to send workers tasks (image index to work on)
    in_q = Queue()
    
    #create a couple worker processes
    processes = [Process(target=worker_process, args = (in_q, shm.name)) for _ in range(cpu_count())]
    for p in processes:
        p.start()
    
    #fill up the task queue with image indices that need computation
    for i in range(4000):
        in_q.put(i)
        
    #send a stop signal for each worker
    for _ in processes:
        in_q.put(STOPFLAG())
        
    #wait for all children to finish
    for p in processes:
        p.join()
        
    #do something (save?) with the img_stack
    np.save("processed_images.npy", img_stack)
    
    shm.close() #cleanup
    shm.unlink() #unlink is called only once after the last instance has been "close()"d