Desidero confrontare le diverse aree di un array bidimensionale $ A $ con un array specificato $ b $ di dimensioni inferiori. Dato che devo farlo molte volte, è fondamentale che questo sia eseguito molto velocemente. Ho una soluzione che funziona bene, ma speravo che potesse essere fatto più bello e più veloce.Qual è il modo più veloce per confrontare le patch di un array?
In dettaglio:
Diciamo che abbiamo un grande array e un piccolo array. I loop su tutte le possibili "patch" all'interno del grande array che hanno le stesse dimensioni del piccolo array e confrontare queste patch con il piccolo array dato.
def get_best_fit(big_array, small_array):
# we assume the small array is square
patch_size = small_array.shape[0]
min_value = np.inf
for x in range(patch_size, big_array.shape[0] - patch_size):
for y in range(patch_size, big_array.shape[1] - patch_size):
p = get_patch_term(x, y, patch_size, big_array)
tmp = some_metric(p, small_array)
if min_value > tmp:
min_value = tmp
min_patch = p
return min_patch, min_value
Al fine di ottenere le patch ho avuto questa implementazione di accesso agli array diretta:
def get_patch_term(x, y, patch_size, data):
"""
a patch has the size (patch_size)^^2
"""
patch = data[(x - (patch_size-1)/2): (x + (patch_size-1)/2 + 1),
(y - (patch_size-1)/2): (y + (patch_size-1)/2 + 1)]
return patch
immagino che questo è il compito più importante e può essere eseguita più veloce, ma non sono sicuro su di esso.
Ho dato un'occhiata a Cython ma forse ho sbagliato, non ho molta familiarità con esso.
Il mio primo tentativo è stato una traduzione diretta in Cython:
def get_patch_term_fast(Py_ssize_t x_i, Py_ssize_t y_i,
Py_ssize_t patch_size,
np.ndarray[DTYPE_t, ndim=2] big_array):
assert big_array.dtype == DTYPE
patch_size = (patch_size - 1)/2
cdef np.ndarray[DTYPE_t, ndim=2] patch = <np.ndarray[DTYPE_t, ndim=2]>big_array[(x_i - patch_size):(x_i + patch_size + 1), (y_i - patch_size): (y_i + patch_size + 1)]
return patch
E questo sembra essere più veloce (vedi sotto), ma ho pensato che un approccio parallelo dovrebbe essere migliore, così mi si avvicinò con questo
def get_patch_term_fast_parallel(Py_ssize_t x_i, Py_ssize_t y_i,
Py_ssize_t patch_size,
np.ndarray[DTYPE_t, ndim=2] big_array):
assert big_array.dtype == DTYPE
patch_size = (patch_size - 1)/2
assert big_array.dtype == DTYPE
cdef Py_ssize_t x
cdef Py_ssize_t y
cdef np.ndarray[DTYPE_t, ndim=1] patch = np.empty(np.power((2 * patch_size) + 1, 2))
with nogil, parallel():
for x in prange(x_i - patch_size, x_i + patch_size + 1):
for y in prange(y_i - patch_size, y_i + patch_size + 1):
patch[((x - (x_i - patch_size)) * (2 * patch_size + 1)) + (y - (y_i - patch_size))] = big_array[x, y]
#cdef np.ndarray[DTYPE_t, ndim=2] patch = <np.ndarray[DTYPE_t, ndim=2]>big_array[(x_i - patch_size):(x_i + patch_size + 1), (y_i - patch_size): (y_i + patch_size + 1)]
return patch
Che, sfortunatamente, non è più veloce. Per la prova ho usato:
A = np.array(range(1200), dtype=np.float).reshape(30, 40)
b = np.array([41, 42, 81, 84]).reshape(2, 2)
x = 7
y = 7
print(timeit.timeit(lambda:get_patch_term_fast(x, y, 3, A), number=300))
print(timeit.timeit(lambda:get_patch_term_fast_parallel(x, y, 3, A).reshape(3,3), number=300))
print(timeit.timeit(lambda:get_patch_term(x, y, 3, A), number=300))
che dà
0.0008792859734967351
0.0029909340664744377
0.0029337930027395487
Quindi, la mia prima domanda è, è possibile farlo più velocemente? La seconda domanda sarebbe: perché l'approccio parallelo non è più veloce rispetto all'implementazione numpy originale?
Edit:
ho cercato di parallelizzare ulteriormente la funzione get_best_fit ma purtroppo ho un sacco di errori affermando che non posso assegnare un oggetto Python, senza gil.
Ecco il codice:
def get_best_fit_fast(np.ndarray[DTYPE_t, ndim=2] big_array,
np.ndarray[DTYPE_t, ndim=2] small_array):
# we assume the small array is square
cdef Py_ssize_t patch_size = small_array.shape[0]
cdef Py_ssize_t x
cdef Py_ssize_t y
cdef Py_ssize_t x_range = big_array.shape[0] - patch_size
cdef Py_ssize_t y_range = big_array.shape[1] - patch_size
cdef np.ndarray[DTYPE_t, ndim=2] p
cdef np.ndarray[DTYPE_t, ndim=2] weights = np.empty((x_range - patch_size)*(y_range - patch_size)).reshape((x_range - patch_size), (y_range - patch_size))
with nogil, parallel():
for x in prange(patch_size, x_range):
for y in prange(patch_size, y_range):
p = get_patch_term_fast(x, y, patch_size, big_array)
weights[x - patch_size, y - patch_size] = np.linalg.norm(np.abs(p - small_array))
return np.min(weights)
PS: ho omesso la parte di restituire il più piccolo cerotto ...
In base alla correlazione incrociata 'some_metric' (o qualcosa di simile, a seconda della metrica) che utilizza trasformazioni di Fourier veloci potrebbe essere più veloce. – DavidW
In ogni caso, prendere semplici slice in numpy è davvero abbastanza efficiente (non implica nemmeno la copia di qualcosa) quindi è improbabile che lo si possa battere in Cython. Potresti avere più fortuna nell'applicare Cython ai loop in "get_best_fit". – DavidW
Sfortunatamente, una traduzione one-to-one della funzione get_best_fit in Cython non offre alcun vantaggio in termini di velocità. E non riesco a farlo funzionare in parallelo. Anche se dovrebbe funzionare teoricamente, l'assegnazione di oggetti all'interno del ciclo parallelo mi dà problemi. – mijc