
shows benchmark results for a family of alternative implementations, including one transcribed from a now-deleted answer; this is run on a square matrix of increasing size. This shows that:
- You should never use the original method
- For small inputs (up to ~200 items to a side), the most efficient method is to perform a grouped sum using
bincount to calculate antitraces, and then divide by the trace lengths to get means
- For large inputs, if you know the size of the array ahead of time and you want to re-run multiple times with differing data, then pre-calculate a sparse coefficient matrix; this reduces the repeated portion to a single sparse matrix multiplication
- Similar to pre-calculating that linear operator but even faster, you can prepare the supporting indices for the
bincount method (described below)
- For large inputs, if you don't know the size of the array across multiple calls, then
bincount is still probably best
The results above hold true for very tall, narrow (m >> n) and short, wide (m << n) input as well.
Specific to your own implementation, replace module mean with instance mean, and don't recalculate the [::-1] on every iteration - calculate that outside of your loop.
import functools
import timeit
import typing
import numpy as np
import pandas as pd
import scipy
import seaborn
from matplotlib import pyplot as plt
def original(x: np.ndarray) -> np.ndarray:
x1d = [
np.mean(x[::-1, :].diagonal(i))
for i in range(-x.shape[0] + 1, x.shape[1])
]
return np.array(x1d)
def gareth_rees(a: np.ndarray) -> np.ndarray:
h, w = a.shape # Height and width of a
j, i = np.mgrid[:h, :w] # Indexes of elements of a
n = h + w - 1 # Number of antidiagonals
longest = min(h, w) # Length of longest antidiagonal
diags = np.zeros((n, longest)) # Array of antidiagonals
diags[i + j, i - np.maximum(0, i + j - h + 1)] = a
k = np.arange(n) # Indexes of antidiagonals
l = np.minimum(longest, np.minimum(k + 1, n - k)) # Lengths of antidiagonals
return diags.sum(axis=1) / l # Means of antidiagonals
def preflip(x: np.ndarray) -> np.ndarray:
m, n = x.shape
vflip = x[::-1]
return np.array([vflip.diagonal(i).mean() for i in range(1 - m, n)])
def trace_loop(x: np.ndarray) -> np.ndarray:
m, n = x.shape
vflip = x[::-1]
traces = np.array([vflip.trace(i) for i in range(1 - m, n)], dtype=np.float64)
smaller = min(x.shape)
upper_idx = m + n - smaller
traces[:smaller] /= np.arange(1, 1 + smaller) # Lower triangular diagonals
traces[smaller: upper_idx] /= smaller # Lower and upper rectangular diagonals
traces[upper_idx:] /= np.arange(smaller - 1, 0, -1) # Upper triangular diagonals
return traces
def bincount(x: np.ndarray) -> np.ndarray:
"""
Based on variable-sum method in
https://numpy.org/doc/stable/reference/generated/numpy.bincount.html#codecell3
"""
m, n = x.shape
i = np.arange(m)
j = np.arange(n)
idx = i[:, np.newaxis] + j
traces = np.bincount(idx.ravel(), weights=x.ravel())
smaller = min(x.shape)
upper_idx = m + n - smaller
traces[:smaller] /= np.arange(1, 1 + smaller) # Lower triangular diagonals
traces[smaller: upper_idx] /= smaller # Lower and upper rectangular diagonals
traces[upper_idx:] /= np.arange(smaller - 1, 0, -1) # Upper triangular diagonals
return traces
def linear_prep(shape: tuple[int, int]) -> scipy.sparse.sparray:
m, n = shape
smaller = min(shape)
upper_idx = m + n - smaller
lengths = np.concatenate((
np.arange(1, 1 + smaller),
np.full(shape=upper_idx - smaller, fill_value=smaller),
np.arange(smaller - 1, 0, -1),
))
data = np.lib.stride_tricks.sliding_window_view(
x=1/lengths, window_shape=n,
)
return scipy.sparse.csr_array(
(
data.ravel(),
np.add.outer( # indices
np.arange(m), np.arange(n),
).ravel(),
np.arange(1 + m*n), # indptr
),
shape=(m*n, m + n - 1),
)
def linear_dry(x: np.ndarray) -> np.ndarray:
return x.ravel() @ linear_prep(x.shape)
def linear_prepped(x: np.ndarray, right: scipy.sparse.sparray) -> np.ndarray:
return x.ravel() @ right
METHODS = (
gareth_rees,
original,
preflip,
linear_dry,
trace_loop,
bincount,
)
def bound_methods(
rand: np.random.Generator, shape: tuple[int, int],
) -> tuple[
np.ndarray,
list[
tuple[
str,
typing.Callable[[np.ndarray], np.ndarray],
]
],
]:
array = rand.integers(low=0, high=10, size=shape)
linear_right = linear_prep(shape)
methods = [
(method.__name__, method) for method in METHODS
] + [
('linear_prepped', functools.partial(linear_prepped, right=linear_right)),
]
return array, methods
def test() -> None:
rand = np.random.default_rng(seed=0)
for shape in (
(3, 4),
(4, 3),
(24, 25),
(25, 24),
(1, 5),
(5, 1),
(1, 1),
):
array, ((ref_name, ref_method), *methods) = bound_methods(rand=rand, shape=shape)
ref_means = ref_method(array)
for name, method in methods:
means = method(array)
assert np.allclose(ref_means, means, atol=0, rtol=1e-14)
def benchmark() -> plt.Figure:
rand = np.random.default_rng(seed=0)
results = []
for size in np.logspace(start=0.5, stop=3.6, num=100, dtype=int):
print('size =', size)
shape = size, size
array, methods = bound_methods(rand=rand, shape=shape)
for name, method in methods:
if method == gareth_rees and size > 1e3:
continue # too slow
def run() -> np.ndarray:
return method(array)
t = timeit.timeit(stmt=run, number=1)
result = name, size, t
results.append(result)
df = pd.DataFrame(data=results, columns=('method', 'size', 't'))
fig, ax = plt.subplots()
seaborn.lineplot(ax=ax, data=df, x='size', y='t', hue='method')
ax.set_xscale('log')
ax.set_yscale('log')
return fig
if __name__ == '__main__':
test()
benchmark()
plt.show()
Prepared bincount
Similar to the prepared linear operator, if you have a constant size across several calls then you can pre-calculate the supporting indices for the bincount method:
def bincount_prep(shape: tuple[int, int]) -> tuple[np.ndarray, np.ndarray]:
m, n = shape
i = np.arange(m)
j = np.arange(n)
idx = (i[:, np.newaxis] + j).ravel()
smaller = min(shape)
upper_idx = m + n - smaller
coef = 1/np.concatenate((
np.arange(1, 1 + smaller), # Lower triangular diagonals
np.full(shape=upper_idx - smaller, fill_value=smaller), # Lower and upper rectangular diagonals
np.arange(smaller - 1, 0, -1), # Upper triangular diagonals
))
return idx, coef
def bincount_prepped(
x: np.ndarray,
idx: np.ndarray,
coef: np.ndarray,
) -> np.ndarray:
traces = np.bincount(idx, weights=x.ravel())
return traces * coef
