Newer
Older
import pystencils as ps
from pystencils.rng import PhiloxFourFloats, PhiloxTwoDoubles, AESNIFourFloats, AESNITwoDoubles, random_symbol
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets
from pystencils.cpu.cpujit import get_compiler_config
RNGs = {('philox', 'float'): PhiloxFourFloats, ('philox', 'double'): PhiloxTwoDoubles,
('aesni', 'float'): AESNIFourFloats, ('aesni', 'double'): AESNITwoDoubles}
instruction_sets = get_supported_instruction_sets()
if get_compiler_config()['os'] == 'windows':
# skip instruction sets supported by the CPU but not by the compiler
if 'avx' in instruction_sets and ('/arch:avx2' not in get_compiler_config()['flags'].lower()
and '/arch:avx512' not in get_compiler_config()['flags'].lower()):
instruction_sets.remove('avx')
if 'avx512' in instruction_sets and '/arch:avx512' not in get_compiler_config()['flags'].lower():
instruction_sets.remove('avx512')
if 'avx512vl' in instruction_sets and '/arch:avx512' not in get_compiler_config()['flags'].lower():
instruction_sets.remove('avx512vl')
@pytest.mark.parametrize('target, rng', ((Target.CPU, 'philox'), (Target.CPU, 'aesni'), (Target.GPU, 'philox')))
@pytest.mark.parametrize('precision', ('float', 'double'))
@pytest.mark.parametrize('dtype', ('float', 'double'))
def test_rng(target, rng, precision, dtype, t=124, offsets=(0, 0), keys=(0, 0), offset_values=None):
if instruction_sets and {'neon', 'sve', 'sve2', 'sme', 'vsx', 'rvv'}.intersection(instruction_sets) and rng == 'aesni':
pytest.xfail('AES not yet implemented for this architecture')
if rng == 'aesni' and len(keys) == 2:
keys *= 2
if offset_values is None:
offset_values = offsets
dh = ps.create_data_handling((2, 2), default_ghost_layers=0, default_target=target)
f = dh.add_array("f", values_per_cell=4 if precision == 'float' else 2,
dtype=np.float32 if dtype == 'float' else np.float64)
dh.fill(f.name, 42.0)
rng_node = RNGs[(rng, precision)](dh.dim, offsets=offsets, keys=keys)
assignments = [rng_node] + [SympyAssignment(f(i), s) for i, s in enumerate(rng_node.result_symbols)]
kernel = ps.create_kernel(assignments, target=dh.default_target).compile()
kwargs = {'time_step': t}
if offset_values != offsets:
kwargs.update({k.name: v for k, v in zip(offsets, offset_values)})
dh.run_kernel(kernel, **kwargs)
arr = dh.gather_array(f.name)
assert np.logical_and(arr <= 1.0, arr >= 0).all()
if rng == 'philox' and t == 124 and offsets == (0, 0) and keys == (0, 0) and dh.shape == (2, 2):
int_reference = np.array([[[3576608082, 1252663339, 1987745383, 348040302],
[1032407765, 970978240, 2217005168, 2424826293]],
[[2958765206, 3725192638, 2623672781, 1373196132],
[850605163, 1694561295, 3285694973, 2799652583]]])
else:
pytest.importorskip('randomgen')
if rng == 'aesni':
from randomgen import AESCounter
int_reference = np.empty(dh.shape + (4,), dtype=int)
for x in range(dh.shape[0]):
for y in range(dh.shape[1]):
r = AESCounter(counter=t + (x + offset_values[0]) * 2 ** 32 + (y + offset_values[1]) * 2 ** 64,
key=keys[0] + keys[1] * 2 ** 32 + keys[2] * 2 ** 64 + keys[3] * 2 ** 96,
mode="sequence")
a, b = r.random_raw(size=2)
int_reference[x, y, :] = [a % 2 ** 32, a // 2 ** 32, b % 2 ** 32, b // 2 ** 32]
else:
from randomgen import Philox
int_reference = np.empty(dh.shape + (4,), dtype=int)
for x in range(dh.shape[0]):
for y in range(dh.shape[1]):
r = Philox(counter=t + (x + offset_values[0]) * 2 ** 32 + (y + offset_values[1]) * 2 ** 64 - 1,
key=keys[0] + keys[1] * 2 ** 32, number=4, width=32, mode="sequence")
int_reference[x, y, :] = r.random_raw(size=4)
if precision == 'float' or dtype == 'float':
eps = np.finfo(np.float32).eps
else:
eps = np.finfo(np.float64).eps
if rng == 'aesni': # precision appears to be slightly worse
eps = max(1e-12, 2 * eps)
if precision == 'float':
reference = int_reference * 2. ** -32 + 2. ** -33
else:
x = int_reference[:, :, 0::2]
y = int_reference[:, :, 1::2]
z = x ^ y << (53 - 32)
reference = z * 2. ** -53 + 2. ** -54
assert np.allclose(arr, reference, rtol=0, atol=eps)
@pytest.mark.parametrize('vectorized', (False, True))
@pytest.mark.parametrize('kind', ('value', 'symbol'))
def test_rng_offsets(kind, vectorized):
if vectorized:
test = test_rng_vectorized
if not instruction_sets:
pytest.skip("cannot detect CPU instruction set")
else:
test = test_rng
if kind == 'value':
test(instruction_sets[-1] if vectorized else Target.CPU, 'philox', 'float', 'float', t=8,
offsets=(6, 7), keys=(5, 309))
elif kind == 'symbol':
offsets = (TypedSymbol("x0", np.uint32), TypedSymbol("y0", np.uint32))
test(instruction_sets[-1] if vectorized else Target.GPU, 'philox', 'float', 'float', t=8,
offsets=offsets, offset_values=(6, 7), keys=(5, 309))
@pytest.mark.parametrize('target', instruction_sets)
@pytest.mark.parametrize('rng', ('philox', 'aesni'))
@pytest.mark.parametrize('precision,dtype', (('float', 'float'), ('double', 'double')))
def test_rng_vectorized(target, rng, precision, dtype, t=130, offsets=(1, 3), keys=(0, 0), offset_values=None):
if (target in ['neon', 'vsx', 'rvv', 'sme'] or target.startswith('sve')) and rng == 'aesni':
pytest.xfail('AES not yet implemented for this architecture')
cpu_vectorize_info = {'assume_inner_stride_one': True, 'assume_aligned': True, 'instruction_set': target}
dh = ps.create_data_handling((131, 131), default_ghost_layers=0, default_target=Target.CPU)
f = dh.add_array("f", values_per_cell=4 if precision == 'float' else 2,
dtype=np.float32 if dtype == 'float' else np.float64, alignment=True)
dh.fill(f.name, 42.0)
ref = dh.add_array("ref", values_per_cell=4 if precision == 'float' else 2)
rng_node = RNGs[(rng, precision)](dh.dim, offsets=offsets)
assignments = [rng_node] + [SympyAssignment(ref(i), s) for i, s in enumerate(rng_node.result_symbols)]
kernel = ps.create_kernel(assignments, target=dh.default_target).compile()
kwargs = {'time_step': t}
if offset_values is not None:
kwargs.update({k.name: v for k, v in zip(offsets, offset_values)})
dh.run_kernel(kernel, **kwargs)
rng_node = RNGs[(rng, precision)](dh.dim, offsets=offsets)
assignments = [rng_node] + [SympyAssignment(f(i), s) for i, s in enumerate(rng_node.result_symbols)]
kernel = ps.create_kernel(assignments, target=dh.default_target, cpu_vectorize_info=cpu_vectorize_info).compile()
dh.run_kernel(kernel, **kwargs)
ref_data = dh.gather_array(ref.name)
data = dh.gather_array(f.name)
assert np.allclose(ref_data, data)
@pytest.mark.parametrize('vectorized', (False, True))
def test_rng_symbol(vectorized):
"""Make sure that the RNG symbol generator generates symbols and that the resulting code compiles"""
if vectorized:
if not instruction_sets:
pytest.skip("cannot detect CPU instruction set")
else:
cpu_vectorize_info = {'assume_inner_stride_one': True, 'assume_aligned': True,
dh = ps.create_data_handling((8, 8), default_ghost_layers=0, default_target=Target.CPU)
f = dh.add_array("f", values_per_cell=2 * dh.dim, alignment=True)
nc = NodeCollection([SympyAssignment(f(i), 0) for i in range(f.shape[-1])])
subexpressions = []
rng_symbol_gen = random_symbol(subexpressions, dim=dh.dim)
for i in range(f.shape[-1]):
nc.all_assignments[i] = SympyAssignment(nc.all_assignments[i].lhs, next(rng_symbol_gen))
symbols = [a.rhs for a in nc.all_assignments]
[nc.all_assignments.insert(0, subexpression) for subexpression in subexpressions]
assert len(symbols) == f.shape[-1] and len(set(symbols)) == f.shape[-1]
ps.create_kernel(nc, target=dh.default_target, cpu_vectorize_info=cpu_vectorize_info).compile()
@pytest.mark.parametrize('vectorized', (False, True))
def test_staggered(vectorized):
"""Make sure that the RNG counter can be substituted during loop cutting"""
dh = ps.create_data_handling((8, 8), default_ghost_layers=0, default_target=Target.CPU)
j = dh.add_array("j", values_per_cell=dh.dim, field_type=ps.FieldType.STAGGERED_FLUX)
a = ps.AssignmentCollection([ps.Assignment(j.staggered_access(n), 0) for n in j.staggered_stencil])
rng_symbol_gen = random_symbol(a.subexpressions, dim=dh.dim, rng_node=PhiloxTwoDoubles)
a.main_assignments[0] = ps.Assignment(a.main_assignments[0].lhs, next(rng_symbol_gen))
kernel = ps.create_staggered_kernel(a, target=dh.default_target).compile()
if not vectorized:
return
if not instruction_sets:
pytest.skip("cannot detect CPU instruction set")
pytest.importorskip('islpy')
cpu_vectorize_info = {'assume_inner_stride_one': True, 'assume_aligned': False,
dh.fill(j.name, 867)
dh.run_kernel(kernel, seed=5, time_step=309)
ref_data = dh.gather_array(j.name)
kernel2 = ps.create_staggered_kernel(a, target=dh.default_target, cpu_vectorize_info=cpu_vectorize_info).compile()
dh.fill(j.name, 867)
dh.run_kernel(kernel2, seed=5, time_step=309)
data = dh.gather_array(j.name)
assert np.allclose(ref_data, data)