2

I want to change the value in a large numpy array partially by leveraging multiprocessing.

That is to say, I want to get [[100, 100, 100], [100, 100, 100]] in the end.

However the following code is wrong and it says "RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance"

What should I do? Thanks.

import numpy as np
import multiprocessing

from multiprocessing import RawArray, Array


def change_array(array, i, j):
    X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
    X_np[i, j] = 100
    print(np.frombuffer(array.get_obj()))

if __name__ == '__main__':
    X_shape = (2, 3)
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    X = Array('d', X_shape[0] * X_shape[1])
    # Wrap X as an numpy array so we can easily manipulates its data.
    X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
    # Copy data to our shared array.
    np.copyto(X_np, data)

    pool = multiprocessing.Pool(processes=3)

    result = []
    for i in range(2):
        for j in range(3):
            result.append(pool.apply_async(change_array, (X, i, j,)))

    result = [r.get() for r in result]
    pool.close()
    pool.join()

    print(np.frombuffer(X.get_obj()).reshape(2, 3))

1
  • You'll need to use Pool's initializer to pass the shared array, see. Commented Dec 8, 2020 at 14:15

2 Answers 2

7

You need to make two changes:

  1. Use a multiprocessing.Array instance with locking (actually, the default) rather than a "plain" Array.
  2. Do not pass the array instance as an argument to your worker function. Instead you should initialize each processor in your pool with the array as a global value.
import numpy as np
import multiprocessing

from multiprocessing import RawArray, Array


def initpool(arr):
    global array
    array = arr

def change_array(i, j):
    X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
    X_np[i, j] = 100
    print(np.frombuffer(array.get_obj()))

if __name__ == '__main__':
    X_shape = (2, 3)
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    X = multiprocessing.Array('d', X_shape[0] * X_shape[1], lock=True)
    # Wrap X as an numpy array so we can easily manipulates its data.
    X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
    # Copy data to our shared array.
    np.copyto(X_np, data)

    pool = multiprocessing.Pool(processes=3, initializer=initpool, initargs=(X,))

    result = []
    for i in range(2):
        for j in range(3):
            result.append(pool.apply_async(change_array, (i, j,)))

    result = [r.get() for r in result]
    pool.close()
    pool.join()

    print(np.frombuffer(X.get_obj()).reshape(2, 3))

Prints:

[100.    2.2   3.3   4.4   5.5   6.6]
[100.  100.    3.3   4.4   5.5   6.6]
[100.  100.  100.    4.4   5.5   6.6]
[100.  100.  100.  100.    5.5   6.6]
[100.  100.  100.  100.  100.    6.6]
[100. 100. 100. 100. 100. 100.]
[[100. 100. 100.]
 [100. 100. 100.]]

Update

Since in this case the values being changed in the data array do not depend on the existing values in that array, there is no need for function change_array to have access to the array and it can instead, as suggested by Frank Yellin, just return a tuple of the indices to be changed with the new value. But I did want to show you how you would pass the array for those situations where the function did need to access/modify the array. The following code, in this instance, however, is all that you need (I have made a few simplifications):

import numpy as np
import multiprocessing


def change_array(i, j):
    return i, j, 100

if __name__ == '__main__':
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    with multiprocessing.Pool(processes=3) as pool:
        result = [pool.apply_async(change_array, (i, j)) for i in range(2) for j in range(3)]
        for r in result:
            i, j, value = r.get()
            data[i, j] = value
        print(data)

Or:

import numpy as np
import multiprocessing
import itertools


def change_array(t):
    i, j = t
    return i, j, 100

if __name__ == '__main__':
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    with multiprocessing.Pool(processes=3) as pool:
        for i, j, value in pool.map(change_array, itertools.product(range(2), range(3))):
            data[i, j] = value
        print(data)
Sign up to request clarification or add additional context in comments.

2 Comments

Thank your very much, that is what I want to solve!
Thanks and when it's not a single source project, it seems that the pool initialization function must be defined in the source or module where the worker function is defined, as otherwise setting the global variable for the worker function simply won't make it part of its module at runtime and (it will remain undefined for it).
2

The most important rule of multiprocessing. You do not want to be modifying a shared object in your subprocesses if at all possible. You want your worker program to be:

def change_array(i, j):
    value = ..... whatever value goes here
    return i, j, value

Your main process would then read the values i,j,value that are returned and set the element of the array to the right value.

3 Comments

Thank you for you reply. I don't know if it is fast. Actually, the array shape in my application is 18000 * 36000, and I need to change them by array[i][j] = value respectively. However it is very slow and will take about 30+ hours. So, I don't know if the operation to get array[i][j] is slow?
That works here only because in this instance change_array doesn't need to even access the array because the value being set is totally independent of any values currently in the array (or i and j for that matter). But what if the problem required looking at the array values to compute the return value? You haven't specified how to resolve the original problem the OP is having as to how change_array can access the array instance.
I'm going to agree with the other answer posted here (@booboo) that if you really really need a shared array, using multiprocessing.Array. But my experience has been that multiprocessing works better with as little shared data as possible. If each worker can be passed the small amount of data it needs to calculate its result and can operate independently of all other workers, your code will be easier to understand and debug.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.