Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 2158 additions and 0 deletions
import numpy as np
import pytest
import pystencils as ps
from pystencils.astnodes import SympyAssignment
from pystencils.node_collection import NodeCollection
from pystencils.rng import PhiloxFourFloats, PhiloxTwoDoubles, AESNIFourFloats, AESNITwoDoubles, random_symbol
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets
from pystencils.cpu.cpujit import get_compiler_config
from pystencils.typing import TypedSymbol
from pystencils.enums import Target
RNGs = {('philox', 'float'): PhiloxFourFloats, ('philox', 'double'): PhiloxTwoDoubles,
('aesni', 'float'): AESNIFourFloats, ('aesni', 'double'): AESNITwoDoubles}
instruction_sets = get_supported_instruction_sets()
if get_compiler_config()['os'] == 'windows':
# skip instruction sets supported by the CPU but not by the compiler
if 'avx' in instruction_sets and ('/arch:avx2' not in get_compiler_config()['flags'].lower()
and '/arch:avx512' not in get_compiler_config()['flags'].lower()):
instruction_sets.remove('avx')
if 'avx512' in instruction_sets and '/arch:avx512' not in get_compiler_config()['flags'].lower():
instruction_sets.remove('avx512')
if 'avx512vl' in instruction_sets and '/arch:avx512' not in get_compiler_config()['flags'].lower():
instruction_sets.remove('avx512vl')
@pytest.mark.parametrize('target, rng', ((Target.CPU, 'philox'), (Target.CPU, 'aesni'), (Target.GPU, 'philox')))
@pytest.mark.parametrize('precision', ('float', 'double'))
@pytest.mark.parametrize('dtype', ('float', 'double'))
def test_rng(target, rng, precision, dtype, t=124, offsets=(0, 0), keys=(0, 0), offset_values=None):
if target == Target.GPU:
pytest.importorskip('cupy')
if instruction_sets and {'neon', 'sve', 'sve2', 'sme', 'vsx', 'rvv'}.intersection(instruction_sets) and rng == 'aesni':
pytest.xfail('AES not yet implemented for this architecture')
if rng == 'aesni' and len(keys) == 2:
keys *= 2
if offset_values is None:
offset_values = offsets
dh = ps.create_data_handling((2, 2), default_ghost_layers=0, default_target=target)
f = dh.add_array("f", values_per_cell=4 if precision == 'float' else 2,
dtype=np.float32 if dtype == 'float' else np.float64)
dh.fill(f.name, 42.0)
rng_node = RNGs[(rng, precision)](dh.dim, offsets=offsets, keys=keys)
assignments = [rng_node] + [SympyAssignment(f(i), s) for i, s in enumerate(rng_node.result_symbols)]
kernel = ps.create_kernel(assignments, target=dh.default_target).compile()
dh.all_to_gpu()
kwargs = {'time_step': t}
if offset_values != offsets:
kwargs.update({k.name: v for k, v in zip(offsets, offset_values)})
dh.run_kernel(kernel, **kwargs)
dh.all_to_cpu()
arr = dh.gather_array(f.name)
assert np.logical_and(arr <= 1.0, arr >= 0).all()
if rng == 'philox' and t == 124 and offsets == (0, 0) and keys == (0, 0) and dh.shape == (2, 2):
int_reference = np.array([[[3576608082, 1252663339, 1987745383, 348040302],
[1032407765, 970978240, 2217005168, 2424826293]],
[[2958765206, 3725192638, 2623672781, 1373196132],
[850605163, 1694561295, 3285694973, 2799652583]]])
else:
pytest.importorskip('randomgen')
if rng == 'aesni':
from randomgen import AESCounter
int_reference = np.empty(dh.shape + (4,), dtype=int)
for x in range(dh.shape[0]):
for y in range(dh.shape[1]):
r = AESCounter(counter=t + (x + offset_values[0]) * 2 ** 32 + (y + offset_values[1]) * 2 ** 64,
key=keys[0] + keys[1] * 2 ** 32 + keys[2] * 2 ** 64 + keys[3] * 2 ** 96,
mode="sequence")
a, b = r.random_raw(size=2)
int_reference[x, y, :] = [a % 2 ** 32, a // 2 ** 32, b % 2 ** 32, b // 2 ** 32]
else:
from randomgen import Philox
int_reference = np.empty(dh.shape + (4,), dtype=int)
for x in range(dh.shape[0]):
for y in range(dh.shape[1]):
r = Philox(counter=t + (x + offset_values[0]) * 2 ** 32 + (y + offset_values[1]) * 2 ** 64 - 1,
key=keys[0] + keys[1] * 2 ** 32, number=4, width=32, mode="sequence")
int_reference[x, y, :] = r.random_raw(size=4)
if precision == 'float' or dtype == 'float':
eps = np.finfo(np.float32).eps
else:
eps = np.finfo(np.float64).eps
if rng == 'aesni': # precision appears to be slightly worse
eps = max(1e-12, 2 * eps)
if precision == 'float':
reference = int_reference * 2. ** -32 + 2. ** -33
else:
x = int_reference[:, :, 0::2]
y = int_reference[:, :, 1::2]
z = x ^ y << (53 - 32)
reference = z * 2. ** -53 + 2. ** -54
assert np.allclose(arr, reference, rtol=0, atol=eps)
@pytest.mark.parametrize('vectorized', (False, True))
@pytest.mark.parametrize('kind', ('value', 'symbol'))
def test_rng_offsets(kind, vectorized):
if vectorized:
test = test_rng_vectorized
if not instruction_sets:
pytest.skip("cannot detect CPU instruction set")
else:
test = test_rng
if kind == 'value':
test(instruction_sets[-1] if vectorized else Target.CPU, 'philox', 'float', 'float', t=8,
offsets=(6, 7), keys=(5, 309))
elif kind == 'symbol':
offsets = (TypedSymbol("x0", np.uint32), TypedSymbol("y0", np.uint32))
test(instruction_sets[-1] if vectorized else Target.GPU, 'philox', 'float', 'float', t=8,
offsets=offsets, offset_values=(6, 7), keys=(5, 309))
@pytest.mark.parametrize('target', instruction_sets)
@pytest.mark.parametrize('rng', ('philox', 'aesni'))
@pytest.mark.parametrize('precision,dtype', (('float', 'float'), ('double', 'double')))
def test_rng_vectorized(target, rng, precision, dtype, t=130, offsets=(1, 3), keys=(0, 0), offset_values=None):
if (target in ['neon', 'vsx', 'rvv', 'sme'] or target.startswith('sve')) and rng == 'aesni':
pytest.xfail('AES not yet implemented for this architecture')
cpu_vectorize_info = {'assume_inner_stride_one': True, 'assume_aligned': True, 'instruction_set': target}
dh = ps.create_data_handling((131, 131), default_ghost_layers=0, default_target=Target.CPU)
f = dh.add_array("f", values_per_cell=4 if precision == 'float' else 2,
dtype=np.float32 if dtype == 'float' else np.float64, alignment=True)
dh.fill(f.name, 42.0)
ref = dh.add_array("ref", values_per_cell=4 if precision == 'float' else 2)
rng_node = RNGs[(rng, precision)](dh.dim, offsets=offsets)
assignments = [rng_node] + [SympyAssignment(ref(i), s) for i, s in enumerate(rng_node.result_symbols)]
kernel = ps.create_kernel(assignments, target=dh.default_target).compile()
kwargs = {'time_step': t}
if offset_values is not None:
kwargs.update({k.name: v for k, v in zip(offsets, offset_values)})
dh.run_kernel(kernel, **kwargs)
rng_node = RNGs[(rng, precision)](dh.dim, offsets=offsets)
assignments = [rng_node] + [SympyAssignment(f(i), s) for i, s in enumerate(rng_node.result_symbols)]
kernel = ps.create_kernel(assignments, target=dh.default_target, cpu_vectorize_info=cpu_vectorize_info).compile()
dh.run_kernel(kernel, **kwargs)
ref_data = dh.gather_array(ref.name)
data = dh.gather_array(f.name)
assert np.allclose(ref_data, data)
@pytest.mark.parametrize('vectorized', (False, True))
def test_rng_symbol(vectorized):
"""Make sure that the RNG symbol generator generates symbols and that the resulting code compiles"""
cpu_vectorize_info = None
if vectorized:
if not instruction_sets:
pytest.skip("cannot detect CPU instruction set")
else:
cpu_vectorize_info = {'assume_inner_stride_one': True, 'assume_aligned': True,
'instruction_set': instruction_sets[-1]}
dh = ps.create_data_handling((8, 8), default_ghost_layers=0, default_target=Target.CPU)
f = dh.add_array("f", values_per_cell=2 * dh.dim, alignment=True)
nc = NodeCollection([SympyAssignment(f(i), 0) for i in range(f.shape[-1])])
subexpressions = []
rng_symbol_gen = random_symbol(subexpressions, dim=dh.dim)
for i in range(f.shape[-1]):
nc.all_assignments[i] = SympyAssignment(nc.all_assignments[i].lhs, next(rng_symbol_gen))
symbols = [a.rhs for a in nc.all_assignments]
[nc.all_assignments.insert(0, subexpression) for subexpression in subexpressions]
assert len(symbols) == f.shape[-1] and len(set(symbols)) == f.shape[-1]
ps.create_kernel(nc, target=dh.default_target, cpu_vectorize_info=cpu_vectorize_info).compile()
@pytest.mark.parametrize('vectorized', (False, True))
def test_staggered(vectorized):
"""Make sure that the RNG counter can be substituted during loop cutting"""
dh = ps.create_data_handling((8, 8), default_ghost_layers=0, default_target=Target.CPU)
j = dh.add_array("j", values_per_cell=dh.dim, field_type=ps.FieldType.STAGGERED_FLUX)
a = ps.AssignmentCollection([ps.Assignment(j.staggered_access(n), 0) for n in j.staggered_stencil])
rng_symbol_gen = random_symbol(a.subexpressions, dim=dh.dim, rng_node=PhiloxTwoDoubles)
a.main_assignments[0] = ps.Assignment(a.main_assignments[0].lhs, next(rng_symbol_gen))
kernel = ps.create_staggered_kernel(a, target=dh.default_target).compile()
if not vectorized:
return
if not instruction_sets:
pytest.skip("cannot detect CPU instruction set")
pytest.importorskip('islpy')
cpu_vectorize_info = {'assume_inner_stride_one': True, 'assume_aligned': False,
'instruction_set': instruction_sets[-1]}
dh.fill(j.name, 867)
dh.run_kernel(kernel, seed=5, time_step=309)
ref_data = dh.gather_array(j.name)
kernel2 = ps.create_staggered_kernel(a, target=dh.default_target, cpu_vectorize_info=cpu_vectorize_info).compile()
dh.fill(j.name, 867)
dh.run_kernel(kernel2, seed=5, time_step=309)
data = dh.gather_array(j.name)
assert np.allclose(ref_data, data)
from pystencils.cache import sharedmethodcache
class Fib:
def __init__(self):
self.fib_rec_called = 0
self.fib_iter_called = 0
@sharedmethodcache("fib_cache")
def fib_rec(self, n):
self.fib_rec_called += 1
return 1 if n <= 1 else self.fib_rec(n-1) + self.fib_rec(n-2)
@sharedmethodcache("fib_cache")
def fib_iter(self, n):
self.fib_iter_called += 1
f1, f2 = 0, 1
for i in range(n):
f2 = f1 + f2
f1 = f2 - f1
return f2
def test_fib_memoization_1():
fib = Fib()
assert "fib_cache" not in fib.__dict__
f13 = fib.fib_rec(13)
assert fib.fib_rec_called == 14
assert "fib_cache" in fib.__dict__
assert fib.fib_cache[(13,)] == f13
for k in range(14):
# fib_iter should use cached results from fib_rec
fib.fib_iter(k)
assert fib.fib_iter_called == 0
def test_fib_memoization_2():
fib = Fib()
f11 = fib.fib_iter(11)
f12 = fib.fib_iter(12)
assert fib.fib_iter_called == 2
f13 = fib.fib_rec(13)
# recursive calls should be cached
assert fib.fib_rec_called == 1
class Triad:
def __init__(self):
self.triad_called = 0
@sharedmethodcache("triad_cache")
def triad(self, a, b, c=0):
"""Computes the triad a*b+c."""
self.triad_called += 1
return a * b + c
def test_triad_memoization():
triad = Triad()
assert triad.triad.__doc__ == "Computes the triad a*b+c."
t = triad.triad(12, 4, 15)
assert triad.triad_called == 1
assert triad.triad_cache[(12, 4, 15)] == t
t = triad.triad(12, 4, c=15)
assert triad.triad_called == 2
assert triad.triad_cache[(12, 4, 'c', 15)] == t
t = triad.triad(12, 4, 15)
assert triad.triad_called == 2
t = triad.triad(12, 4, c=15)
assert triad.triad_called == 2
import sympy as sp
import pystencils as ps
from pystencils import Assignment, AssignmentCollection
from pystencils.simp import (
SimplificationStrategy, apply_on_all_subexpressions,
subexpression_substitution_in_existing_subexpressions)
def test_simplification_strategy():
a, b, c, d, x, y, z = sp.symbols("a b c d x y z")
s0, s1, s2, s3 = sp.symbols("s_:4")
a0, a1, a2, a3 = sp.symbols("a_:4")
subexpressions = [
Assignment(s0, 2 * a + 2 * b),
Assignment(s1, 2 * a + 2 * b + 2 * c),
Assignment(s2, 2 * a + 2 * b + 2 * c + 2 * d),
]
main = [
Assignment(a0, s0 + s1),
Assignment(a1, s0 + s2),
Assignment(a2, s1 + s2),
]
ac = AssignmentCollection(main, subexpressions)
strategy = SimplificationStrategy()
strategy.add(subexpression_substitution_in_existing_subexpressions)
strategy.add(apply_on_all_subexpressions(sp.factor))
result = strategy(ac)
assert result.operation_count['adds'] == 7
assert result.operation_count['muls'] == 4
assert result.operation_count['divs'] == 0
# Trigger display routines, such that they are at least executed
report = strategy.show_intermediate_results(ac, symbols=[s0])
assert 's_0' in str(report)
report = strategy.show_intermediate_results(ac)
assert 's_{1}' in report._repr_html_()
report = strategy.create_simplification_report(ac)
assert 'Adds' in str(report)
assert 'Adds' in report._repr_html_()
assert 'factor' in str(strategy)
def test_split_inner_loop():
dst = ps.fields('dst(8): double[2D]')
s = sp.symbols('s_:8')
x = sp.symbols('x')
subexpressions = []
main = [
Assignment(dst[0, 0](0), s[0]),
Assignment(dst[0, 0](1), s[1]),
Assignment(dst[0, 0](2), s[2]),
Assignment(dst[0, 0](3), s[3]),
Assignment(dst[0, 0](4), s[4]),
Assignment(dst[0, 0](5), s[5]),
Assignment(dst[0, 0](6), s[6]),
Assignment(dst[0, 0](7), s[7]),
Assignment(x, sum(s))
]
ac = AssignmentCollection(main, subexpressions)
split_groups = [[dst[0, 0](0), dst[0, 0](1)],
[dst[0, 0](2), dst[0, 0](3)],
[dst[0, 0](4), dst[0, 0](5)],
[dst[0, 0](6), dst[0, 0](7), x]]
ac.simplification_hints['split_groups'] = split_groups
ast = ps.create_kernel(ac)
code = ps.get_code_str(ast)
ps.show_code(ast)
# we have four inner loops as indicated in split groups (4 elements) plus one outer loop
assert code.count('for') == 5
ast = ps.create_kernel(ac, target=ps.Target.GPU)
code = ps.get_code_str(ast)
# on GPUs is wouldn't be good to use loop splitting
assert code.count('for') == 0
ac = AssignmentCollection(main, subexpressions)
ast = ps.create_kernel(ac)
code = ps.get_code_str(ast)
# one inner loop and one outer loop
assert code.count('for') == 2
from sys import version_info as vs
import pytest
import pystencils.config
import sympy as sp
import pystencils as ps
from pystencils import Assignment, AssignmentCollection, fields
from pystencils.simp import subexpression_substitution_in_main_assignments
from pystencils.simp import add_subexpressions_for_divisions
from pystencils.simp import add_subexpressions_for_sums
from pystencils.simp import add_subexpressions_for_field_reads
from pystencils.simp.simplifications import add_subexpressions_for_constants
from pystencils.typing import BasicType, TypedSymbol
a, b, c, d, x, y, z = sp.symbols("a b c d x y z")
s0, s1, s2, s3 = sp.symbols("s_:4")
f = sp.symbols("f_:9")
def test_subexpression_substitution_in_main_assignments():
subexpressions = [
Assignment(s0, 2 * a + 2 * b),
Assignment(s1, 2 * a + 2 * b + 2 * c),
Assignment(s2, 2 * a + 2 * b + 2 * c + 2 * d),
Assignment(s3, 2 * a + 2 * b * c),
Assignment(x, s1 + s2 + s0 + s3)
]
main = [
Assignment(f[0], s1 + s2 + s0 + s3),
Assignment(f[1], s1 + s2 + s0 + s3),
Assignment(f[2], s1 + s2 + s0 + s3),
Assignment(f[3], s1 + s2 + s0 + s3),
Assignment(f[4], s1 + s2 + s0 + s3)
]
ac = AssignmentCollection(main, subexpressions)
ac = subexpression_substitution_in_main_assignments(ac)
for i in range(0, len(ac.main_assignments)):
assert ac.main_assignments[i].rhs == x
def test_add_subexpressions_for_divisions():
subexpressions = [
Assignment(s0, 2 / a + 2 / b),
Assignment(s1, 2 / a + 2 / b + 2 / c),
Assignment(s2, 2 / a + 2 / b + 2 / c + 2 / d),
Assignment(s3, 2 / a + 2 / b / c),
Assignment(x, s1 + s2 + s0 + s3)
]
main = [
Assignment(f[0], s1 + s2 + s0 + s3)
]
ac = AssignmentCollection(main, subexpressions)
divs_before_optimisation = ac.operation_count["divs"]
ac = add_subexpressions_for_divisions(ac)
divs_after_optimisation = ac.operation_count["divs"]
assert divs_before_optimisation - divs_after_optimisation == 8
rhs = []
for i in range(len(ac.subexpressions)):
rhs.append(ac.subexpressions[i].rhs)
assert 1/a in rhs
assert 1/b in rhs
assert 1/c in rhs
assert 1/d in rhs
def test_add_subexpressions_for_constants():
half = sp.Rational(1,2)
sqrt_2 = sp.sqrt(2)
main = [
Assignment(f[0], half * a + half * b + half * c),
Assignment(f[1], - half * a - half * b),
Assignment(f[2], a * sqrt_2 - b * sqrt_2),
Assignment(f[3], a**2 + b**2)
]
ac = AssignmentCollection(main)
ac = add_subexpressions_for_constants(ac)
assert len(ac.subexpressions) == 2
half_subexp = None
sqrt_subexp = None
for asm in ac.subexpressions:
if asm.rhs == half:
half_subexp = asm.lhs
elif asm.rhs == sqrt_2:
sqrt_subexp = asm.lhs
else:
pytest.fail(f"An unexpected subexpression was encountered: {asm}")
assert half_subexp is not None
assert sqrt_subexp is not None
for asm in ac.main_assignments[:3]:
assert isinstance(asm.rhs, sp.Mul)
assert any(arg == half_subexp for arg in ac.main_assignments[0].rhs.args)
assert any(arg == half_subexp for arg in ac.main_assignments[1].rhs.args)
assert any(arg == sqrt_subexp for arg in ac.main_assignments[2].rhs.args)
# Do not replace exponents!
assert ac.main_assignments[3].rhs == a**2 + b**2
def test_add_subexpressions_for_sums():
subexpressions = [
Assignment(s0, a + b + c + d),
Assignment(s1, 3 * a * sp.sqrt(x) + 4 * b + c),
Assignment(s2, 3 * a * sp.sqrt(x) + 4 * b + c),
Assignment(s3, 3 * a * sp.sqrt(x) + 4 * b + c)
]
main = [
Assignment(f[0], s1 + s2 + s0 + s3)
]
ac = AssignmentCollection(main, subexpressions)
ops_before_optimisation = ac.operation_count
ac = add_subexpressions_for_sums(ac)
ops_after_optimisation = ac.operation_count
assert ops_after_optimisation["adds"] == ops_before_optimisation["adds"]
assert ops_after_optimisation["muls"] < ops_before_optimisation["muls"]
assert ops_after_optimisation["sqrts"] < ops_before_optimisation["sqrts"]
rhs = []
for i in range(len(ac.subexpressions)):
rhs.append(ac.subexpressions[i].rhs)
assert a + b + c + d in rhs
assert 3 * a * sp.sqrt(x) in rhs
def test_add_subexpressions_for_field_reads():
s, v = fields("s(5), v(5): double[2D]")
subexpressions = []
main = [Assignment(s[0, 0](0), 3 * v[0, 0](0)),
Assignment(s[0, 0](1), 10 * v[0, 0](1))]
ac = AssignmentCollection(main, subexpressions)
assert len(ac.subexpressions) == 0
ac2 = add_subexpressions_for_field_reads(ac)
assert len(ac2.subexpressions) == 2
ac3 = add_subexpressions_for_field_reads(ac, data_type="float32")
assert len(ac3.subexpressions) == 2
assert isinstance(ac3.subexpressions[0].lhs, TypedSymbol)
assert ac3.subexpressions[0].lhs.dtype == BasicType("float32")
# added check for early out of add_subexpressions_for_field_reads is no fields appear on the rhs (See #92)
main = [Assignment(s[0, 0](0), 3.0),
Assignment(s[0, 0](1), 4.0)]
ac4 = AssignmentCollection(main, subexpressions)
assert len(ac4.subexpressions) == 0
ac5 = add_subexpressions_for_field_reads(ac4)
assert ac5 is not None
assert ac4 is ac5
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.skipif((vs.major, vs.minor, vs.micro) == (3, 8, 2), reason="does not work on python 3.8.2 for some reason")
def test_sympy_optimizations(target, dtype):
if target == ps.Target.GPU:
pytest.importorskip("cupy")
src, dst = ps.fields(f'src, dst: {dtype}[2d]')
assignments = ps.AssignmentCollection({
src[0, 0]: 1.0 * (sp.exp(dst[0, 0]) - 1)
})
config = pystencils.config.CreateKernelConfig(target=target, default_number_float=dtype)
ast = ps.create_kernel(assignments, config=config)
ps.show_code(ast)
code = ps.get_code_str(ast)
if dtype == 'float32':
assert 'expf(' in code
elif dtype == 'float64':
assert 'exp(' in code
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
@pytest.mark.parametrize('simplification', (True, False))
@pytest.mark.skipif((vs.major, vs.minor, vs.micro) == (3, 8, 2), reason="does not work on python 3.8.2 for some reason")
def test_evaluate_constant_terms(target, simplification):
if target == ps.Target.GPU:
pytest.importorskip("cupy")
src, dst = ps.fields('src, dst: float32[2d]')
# cos of a number will always be simplified
assignments = ps.AssignmentCollection({
src[0, 0]: -sp.cos(1) + dst[0, 0]
})
config = pystencils.config.CreateKernelConfig(target=target, default_assignment_simplifications=simplification)
ast = ps.create_kernel(assignments, config=config)
code = ps.get_code_str(ast)
assert 'cos(' not in code
import numpy as np
import pytest
import pystencils
import sympy as sp
from pystencils import Assignment, Field, create_kernel, fields
def test_size_check():
"""Kernel with two fixed-sized fields creating with same size but calling with wrong size"""
src = np.zeros((20, 21, 9))
dst = np.zeros_like(src)
sym_src = Field.create_from_numpy_array("src", src, index_dimensions=1)
sym_dst = Field.create_from_numpy_array("dst", dst, index_dimensions=1)
update_rule = Assignment(sym_dst(0), sym_src[-1, 1](1) + sym_src[1, -1](2))
ast = create_kernel([update_rule])
func = ast.compile()
# change size of src field
new_shape = [a - 7 for a in src.shape]
src = np.zeros(new_shape)
dst = np.zeros(new_shape)
with pytest.raises(ValueError) as e:
func(src=src, dst=dst)
assert 'Wrong shape' in str(e.value)
def test_fixed_size_mismatch_check():
"""Create kernel with two differently sized but constant fields """
src = np.zeros((20, 21, 9))
dst = np.zeros((21, 21, 9))
sym_src = Field.create_from_numpy_array("src", src, index_dimensions=1)
sym_dst = Field.create_from_numpy_array("dst", dst, index_dimensions=1)
update_rule = Assignment(sym_dst(0),
sym_src[-1, 1](1) + sym_src[1, -1](2))
with pytest.raises(ValueError) as e:
create_kernel([update_rule])
assert 'Differently sized field accesses' in str(e.value)
def test_fixed_and_variable_field_check():
"""Create kernel with two variable sized fields - calling them with different sizes"""
src = np.zeros((20, 21, 9))
sym_src = Field.create_from_numpy_array("src", src, index_dimensions=1)
sym_dst = Field.create_generic("dst", spatial_dimensions=2, index_dimensions=1)
update_rule = Assignment(sym_dst(0),
sym_src[-1, 1](1) + sym_src[1, -1](2))
with pytest.raises(ValueError) as e:
create_kernel(update_rule)
assert 'Mixing fixed-shaped and variable-shape fields' in str(e.value)
def test_two_variable_shaped_fields():
src = np.zeros((20, 21, 9))
dst = np.zeros((22, 21, 9))
sym_src = Field.create_generic("src", spatial_dimensions=2, index_dimensions=1)
sym_dst = Field.create_generic("dst", spatial_dimensions=2, index_dimensions=1)
update_rule = Assignment(sym_dst(0),
sym_src[-1, 1](1) + sym_src[1, -1](2))
ast = create_kernel([update_rule])
func = ast.compile()
with pytest.raises(TypeError) as e:
func(src=src, dst=dst)
assert 'must have same' in str(e.value)
def test_ssa_checks():
f, g = fields("f, g : double[2D]")
a, b, c = sp.symbols("a b c")
with pytest.raises(ValueError) as e:
create_kernel([Assignment(c, f[0, 1]),
Assignment(c, f[1, 0]),
Assignment(g[0, 0], c)])
assert 'Assignments not in SSA form' in str(e.value)
with pytest.raises(ValueError) as e:
create_kernel([Assignment(c, a + 3),
Assignment(a, 42),
Assignment(g[0, 0], c)])
assert 'Symbol a is written, after it has been read' in str(e.value)
with pytest.raises(ValueError) as e:
create_kernel([Assignment(c, c + 1),
Assignment(g[0, 0], c)])
assert 'Symbol c is written, after it has been read' in str(e.value)
def test_loop_independence_checks():
f, g = fields("f, g : double[2D]")
v = fields("v(2) : double[2D]")
with pytest.raises(ValueError) as e:
create_kernel([Assignment(g[0, 1], f[0, 1]),
Assignment(g[0, 0], f[1, 0])])
assert 'Field g is written at two different locations' in str(e.value)
# This is not allowed - because this is not SSA (it can be overwritten with allow_double_writes)
with pytest.raises(ValueError) as e:
create_kernel([Assignment(g[0, 2], f[0, 1]),
Assignment(g[0, 2], 2 * g[0, 2])])
# This is allowed - because allow_double_writes is True now
create_kernel([Assignment(g[0, 2], f[0, 1]),
Assignment(g[0, 2], 2 * g[0, 2])],
config=pystencils.CreateKernelConfig(allow_double_writes=True))
with pytest.raises(ValueError) as e:
create_kernel([Assignment(v[0, 2](1), f[0, 1]),
Assignment(v[0, 1](0), 4),
Assignment(v[0, 2](1), 2 * v[0, 2](1))])
with pytest.raises(ValueError) as e:
create_kernel([Assignment(g[0, 1], 3),
Assignment(f[0, 1], 2 * g[0, 2])])
assert 'Field g is read at (0, 2) and written at (0, 1)' in str(e.value)
import numpy as np
import sympy as sp
import pytest
from pystencils import (
Assignment,
Field,
TypedSymbol,
create_kernel,
make_slice,
Target,
create_data_handling,
)
from pystencils.simp import sympy_cse_on_assignment_list
@pytest.mark.parametrize("target", [Target.CPU, Target.GPU])
def test_sliced_iteration(target):
if target == Target.GPU:
pytest.importorskip("cupy")
size = (4, 4)
dh = create_data_handling(size, default_target=target, default_ghost_layers=0)
src_field = dh.add_array("src", 1)
dst_field = dh.add_array("dst", 1)
dh.fill(src_field.name, 1.0, ghost_layers=True)
dh.fill(dst_field.name, 0.0, ghost_layers=True)
a, b = sp.symbols("a b")
update_rule = Assignment(
dst_field[0, 0],
(
a * src_field[0, 1]
+ a * src_field[0, -1]
+ b * src_field[1, 0]
+ b * src_field[-1, 0]
)
/ 4,
)
s = make_slice[1:3, 1]
kernel = create_kernel(
sympy_cse_on_assignment_list([update_rule]), iteration_slice=s, target=target
).compile()
if target == Target.GPU:
dh.all_to_gpu()
dh.run_kernel(kernel, a=1.0, b=1.0)
if target == Target.GPU:
dh.all_to_cpu()
expected_result = np.zeros(size)
expected_result[1:3, 1] = 1
np.testing.assert_almost_equal(dh.gather_array(dst_field.name), expected_result)
@pytest.mark.parametrize("target", [Target.CPU, Target.GPU])
def test_symbols_in_slice(target):
if target == Target.GPU:
pytest.xfail("Iteration slices including arbitrary symbols are currently broken on GPU")
size = (4, 4)
dh = create_data_handling(size, default_target=target, default_ghost_layers=0)
src_field = dh.add_array("src", 1)
dst_field = dh.add_array("dst", 1)
dh.fill(src_field.name, 1.0, ghost_layers=True)
dh.fill(dst_field.name, 0.0, ghost_layers=True)
a, b = sp.symbols("a b")
update_rule = Assignment(
dst_field[0, 0],
(
a * src_field[0, 1]
+ a * src_field[0, -1]
+ b * src_field[1, 0]
+ b * src_field[-1, 0]
)
/ 4,
)
x_end = TypedSymbol("x_end", "int")
s = make_slice[1:x_end, 1]
x_end_value = size[1] - 1
kernel = create_kernel(
sympy_cse_on_assignment_list([update_rule]), iteration_slice=s, target=target
).compile()
if target == Target.GPU:
dh.all_to_gpu()
dh.run_kernel(kernel, a=1.0, b=1.0, x_end=x_end_value)
if target == Target.GPU:
dh.all_to_cpu()
expected_result = np.zeros(size)
expected_result[1:x_end_value, 1] = 1
np.testing.assert_almost_equal(dh.gather_array(dst_field.name), expected_result)
import numpy as np
from numpy.testing import assert_array_equal
from pystencils import create_data_handling
from pystencils.slicing import SlicedGetter, make_slice, SlicedGetterDataHandling, shift_slice, slice_intersection
def test_sliced_getter():
def get_slice(slice_obj=None):
arr = np.ones((10, 10))
if slice_obj is None:
slice_obj = make_slice[:, :]
return arr[slice_obj]
sli = SlicedGetter(get_slice)
test = make_slice[2:-2, 2:-2]
assert sli[test].shape == (6, 6)
def test_sliced_getter_data_handling():
domain_shape = (10, 10)
dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=1)
dh.add_array("src", values_per_cell=1)
dh.fill("src", 1.0, ghost_layers=True)
dh.add_array("dst", values_per_cell=1)
dh.fill("dst", 0.0, ghost_layers=True)
sli = SlicedGetterDataHandling(dh, 'dst')
slice_obj = make_slice[2:-2, 2:-2]
assert np.sum(sli[slice_obj]) == 0
sli = SlicedGetterDataHandling(dh, 'src')
slice_obj = make_slice[2:-2, 2:-2]
assert np.sum(sli[slice_obj]) == 36
def test_shift_slice():
sh = shift_slice(make_slice[2:-2, 2:-2], [1, 2])
assert sh[0] == slice(3, -1, None)
assert sh[1] == slice(4, 0, None)
sh = shift_slice(make_slice[2:-2, 2:-2], 1)
assert sh[0] == slice(3, -1, None)
assert sh[1] == slice(3, -1, None)
sh = shift_slice([2, 4], 1)
assert sh[0] == 3
assert sh[1] == 5
sh = shift_slice([2, None], 1)
assert sh[0] == 3
assert sh[1] is None
sh = shift_slice([1.5, 1.5], 1)
assert sh[0] == 1.5
assert sh[1] == 1.5
def test_shifted_array_access():
arr = np.array(range(10))
sh = make_slice[2:5]
assert_array_equal(arr[sh], [2,3,4])
sh = shift_slice(sh, 3)
assert_array_equal(arr[sh], [5,6,7])
arr = np.array([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
])
sh = make_slice[0:2, 0:2]
assert_array_equal(arr[sh], [[1, 2], [4, 5]])
sh = shift_slice(sh, (1,1))
assert_array_equal(arr[sh], [[5, 6], [8, 9]])
def test_slice_intersection():
sl1 = make_slice[1:10, 1:10]
sl2 = make_slice[5:15, 5:15]
intersection = slice_intersection(sl1, sl2)
assert intersection[0] == slice(5, 10, None)
assert intersection[1] == slice(5, 10, None)
sl2 = make_slice[12:15, 12:15]
intersection = slice_intersection(sl1, sl2)
assert intersection is None
%% Cell type:code id: tags:
``` python
import pytest
pytest.importorskip('waLBerla')
```
%% Output
<module 'waLBerla' from '/Users/holzer/walberla/python/waLBerla/__init__.py'>
%% Cell type:code id: tags:
``` python
from pystencils.session import *
from time import perf_counter
from statistics import median
from functools import partial
```
%% Cell type:markdown id: tags:
## Benchmark for Python call overhead
%% Cell type:code id: tags:
``` python
inner_repeats = 100
outer_repeats = 5
sizes = [2**i for i in range(1, 8)]
sizes
```
%% Output
$\displaystyle \left[ 2, \ 4, \ 8, \ 16, \ 32, \ 64, \ 128\right]$
[2, 4, 8, 16, 32, 64, 128]
%% Cell type:code id: tags:
``` python
def benchmark_pure(domain_size, extract_first=False):
src = np.zeros(domain_size)
dst = np.zeros_like(src)
f_src, f_dst = ps.fields("src, dst", src=src, dst=dst)
kernel = ps.create_kernel(ps.Assignment(f_dst.center, f_src.center)).compile()
if extract_first:
kernel = kernel.kernel
start = perf_counter()
for i in range(inner_repeats):
kernel(src=src, dst=dst)
src, dst = dst, src
end = perf_counter()
else:
start = perf_counter()
for i in range(inner_repeats):
kernel(src=src, dst=dst)
src, dst = dst, src
end = perf_counter()
return (end - start) / inner_repeats
def benchmark_datahandling(domain_size, parallel=False):
dh = ps.create_data_handling(domain_size, parallel=parallel)
f_src = dh.add_array('src')
f_dst = dh.add_array('dst')
kernel = ps.create_kernel(ps.Assignment(f_dst.center, f_src.center)).compile()
start = perf_counter()
for i in range(inner_repeats):
dh.run_kernel(kernel)
dh.swap('src', 'dst')
end = perf_counter()
return (end - start) / inner_repeats
name_to_func = {
'pure_extract': partial(benchmark_pure, extract_first=True),
'pure_no_extract': partial(benchmark_pure, extract_first=False),
'dh_serial': partial(benchmark_datahandling, parallel=False),
'dh_parallel': partial(benchmark_datahandling, parallel=True),
}
```
%% Cell type:code id: tags:
``` python
result = {'block_size': [],
'name': [],
'time': []}
for bs in sizes:
print("Computing size ", bs)
for name, func in name_to_func.items():
for i in range(outer_repeats):
time = func((bs, bs))
result['block_size'].append(bs)
result['name'].append(name)
result['time'].append(time)
```
%% Output
Computing size 2
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/var/folders/07/0d7kq8fd0sx24cs53zz90_qc0000gp/T/ipykernel_12649/2009975470.py in <module>
7 for name, func in name_to_func.items():
8 for i in range(outer_repeats):
----> 9 time = func((bs, bs))
10 result['block_size'].append(bs)
11 result['name'].append(name)
/var/folders/07/0d7kq8fd0sx24cs53zz90_qc0000gp/T/ipykernel_12649/3509370390.py in benchmark_datahandling(domain_size, parallel)
20
21 def benchmark_datahandling(domain_size, parallel=False):
---> 22 dh = ps.create_data_handling(domain_size, parallel=parallel)
23 f_src = dh.add_array('src')
24 f_dst = dh.add_array('dst')
~/pystencils/pystencils/pystencils/datahandling/__init__.py in create_data_handling(domain_size, periodicity, default_layout, default_target, parallel, default_ghost_layers)
44 if parallel:
45 if wlb is None:
---> 46 raise ValueError("Cannot create parallel data handling because walberla module is not available")
47
48 if periodicity is False or periodicity is None:
ValueError: Cannot create parallel data handling because walberla module is not available
%% Cell type:code id: tags:
``` python
if 'is_test_run' not in globals():
import pandas as pd
import seaborn as sns
data = pd.DataFrame.from_dict(result)
plt.subplot(1,2,1)
sns.barplot(x='block_size', y='time', hue='name', data=data, alpha=0.6)
plt.yscale('log')
plt.subplot(1,2,2)
data = pd.DataFrame.from_dict(result)
sns.barplot(x='block_size', y='time', hue='name', data=data, alpha=0.6)
```
# -*- coding: utf-8 -*-
#
# Copyright © 2019 Stephan Seitz <stephan.seitz@fau.de>
#
# Distributed under terms of the GPLv3 license.
"""
"""
import pystencils
import pystencils.astnodes
import pystencils.config
def test_source_code_comment():
a, b = pystencils.fields('a,b: float[2D]')
assignments = pystencils.AssignmentCollection(
{a.center(): b[0, 2] + b[0, 0]}, {}
)
config = pystencils.config.CreateKernelConfig(target=pystencils.Target.CPU)
ast = pystencils.create_kernel(assignments, config=config)
ast.body.append(pystencils.astnodes.SourceCodeComment("Hallo"))
ast.body.append(pystencils.astnodes.EmptyLine())
ast.body.append(pystencils.astnodes.SourceCodeComment("World!"))
print(ast)
compiled = ast.compile()
assert compiled is not None
pystencils.show_code(ast)
import numpy as np
import sympy as sp
import pytest
import pystencils as ps
from pystencils import x_staggered_vector, TypedSymbol
from pystencils.enums import Target
class TestStaggeredDiffusion:
def _run(self, num_neighbors, target=ps.Target.CPU, openmp=False):
L = (40, 40)
D = 0.066
dt = 1
T = 100
dh = ps.create_data_handling(L, periodicity=True, default_target=target)
c = dh.add_array('c', values_per_cell=1)
j = dh.add_array('j', values_per_cell=num_neighbors, field_type=ps.FieldType.STAGGERED_FLUX)
x_staggered = - c[-1, 0] + c[0, 0]
y_staggered = - c[0, -1] + c[0, 0]
xy_staggered = - c[-1, -1] + c[0, 0]
xY_staggered = - c[-1, 1] + c[0, 0]
jj = j.staggered_access
divergence = -1 * D / (1 + sp.sqrt(2) if j.index_shape[0] == 4 else 1) * \
sum([jj(d) / sp.Matrix(ps.stencil.direction_string_to_offset(d)).norm() for d in j.staggered_stencil
+ [ps.stencil.inverse_direction_string(d) for d in j.staggered_stencil]])
update = [ps.Assignment(c.center, c.center + dt * divergence)]
flux = [ps.Assignment(j.staggered_access("W"), x_staggered),
ps.Assignment(j.staggered_access("S"), y_staggered)]
if j.index_shape[0] == 4:
flux += [ps.Assignment(j.staggered_access("SW"), xy_staggered),
ps.Assignment(j.staggered_access("NW"), xY_staggered)]
staggered_kernel = ps.create_staggered_kernel(flux, target=dh.default_target, cpu_openmp=openmp).compile()
div_kernel = ps.create_kernel(update, target=dh.default_target, cpu_openmp=openmp).compile()
def time_loop(steps):
sync = dh.synchronization_function([c.name])
dh.all_to_gpu()
for i in range(steps):
sync()
dh.run_kernel(staggered_kernel)
dh.run_kernel(div_kernel)
dh.all_to_cpu()
def init():
dh.fill(c.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(c.name, 0)
dh.fill(j.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.cpu_arrays[c.name][L[0] // 2:L[0] // 2 + 2, L[1] // 2:L[1] // 2 + 2] = 1.0
init()
time_loop(T)
reference = np.empty(L)
for x in range(L[0]):
for y in range(L[1]):
r = np.array([x, y]) - L[0] / 2 + 0.5
reference[x, y] = (4 * np.pi * D * T)**(-dh.dim / 2) * np.exp(-np.dot(r, r) / (4 * D * T)) * (2**dh.dim)
assert np.abs(dh.gather_array(c.name) - reference).max() < 5e-4
def test_diffusion_2(self):
self._run(2)
def test_diffusion_4(self):
self._run(4)
def test_diffusion_openmp(self):
self._run(4, openmp=True)
def test_staggered_subexpressions():
dh = ps.create_data_handling((10, 10), periodicity=True, default_target=Target.CPU)
j = dh.add_array('j', values_per_cell=2, field_type=ps.FieldType.STAGGERED)
c = sp.symbols("c")
assignments = [ps.Assignment(j.staggered_access("W"), c),
ps.Assignment(c, 1)]
ps.create_staggered_kernel(assignments, target=dh.default_target).compile()
def test_staggered_loop_cutting():
pytest.importorskip('islpy')
dh = ps.create_data_handling((4, 4), periodicity=True, default_target=Target.CPU)
j = dh.add_array('j', values_per_cell=4, field_type=ps.FieldType.STAGGERED)
assignments = [ps.Assignment(j.staggered_access("SW"), 1)]
ast = ps.create_staggered_kernel(assignments, target=dh.default_target)
assert not ast.atoms(ps.astnodes.Conditional)
def test_staggered_vector():
dim = 2
v = x_staggered_vector(dim)
ctr0 = TypedSymbol('ctr_0', 'int', nonnegative=True)
ctr1 = TypedSymbol('ctr_1', 'int', nonnegative=True)
expected_result = sp.Matrix(tuple((ctr0 + 0.5, ctr1 + 0.5)))
assert v == expected_result
\ No newline at end of file
%% Cell type:code id: tags:
``` python
import pystencils as ps
import sympy as sp
from pystencils.stencil import coefficient_list, plot_expression, plot
```
%% Cell type:code id: tags:
``` python
sten = ((0, 0, 0), (1, 0, 0), (0, 1, 0), (0, 0, 1),
(-1, 0, 0), (0, -1, 0), (0, 0, -1))
plot(sten)
```
%% Output
import pystencils as ps
import sympy as sp
from pystencils.stencil import coefficient_list, plot_expression
import pystencils.plot as plt
def test_coefficient_list():
f = ps.fields("f: double[1D]")
expr = 2 * f[1] + 3 * f[-1]
coff = coefficient_list(expr)
assert coff == [3, 0, 2]
figure = plt.figure()
plot_expression(expr, matrix_form=True, figure=figure)
f = ps.fields("f: double[3D]")
expr = 2 * f[1, 0, 0] + 3 * f[0, -1, 0]
coff = coefficient_list(expr)
assert coff == [[[0, 3, 0], [0, 0, 2], [0, 0, 0]]]
expr = 2 * f[1, 0, 0] + 3 * f[0, -1, 0] + 4 * f[0, 0, 1]
coff = coefficient_list(expr, matrix_form=True)
assert coff[0] == sp.zeros(3, 3)
# in 3D plot only works if there are entries on every of the three 2D planes. In the above examples z-1 was empty
expr = 2 * f[1, 0, 0] + 1 * f[0, -1, 0] + 1 * f[0, 0, 1] + f[0, 0, -1]
figure = plt.figure()
plot_expression(expr, figure=figure)
def test_plot_expression():
f = ps.fields("f: double[2D]")
figure = plt.figure()
plot_expression(2 * f[1, 0] + 3 * f[0, -1], matrix_form=True, figure=figure)
import numpy as np
from pystencils import Assignment, Field, create_kernel
def test_fixed_sized_field():
for order in ('f', 'c'):
for align in (True, False):
dt = np.dtype([('e1', np.float32), ('e2', np.double), ('e3', np.double)], align=align)
arr = np.zeros((3, 2), dtype=dt, order=order)
f = Field.create_from_numpy_array("f", arr)
d = Field.create_from_numpy_array("d", arr)
update_rules = [Assignment(d[0, 0]['e2'], f[0, 0]['e3'])]
result = arr.copy(order=order)
assert result.strides == arr.strides
arr['e2'] = 0
arr['e3'] = np.random.rand(3, 2)
kernel = create_kernel(update_rules).compile()
kernel(f=arr, d=result)
np.testing.assert_almost_equal(result['e2'], arr['e3'])
np.testing.assert_equal(arr['e2'], np.zeros((3, 2)))
def test_variable_sized_field():
for order in ('f', 'c'):
for align in (True, False):
dt = np.dtype([('e1', np.float32), ('e2', np.double), ('e3', np.double)], align=align)
f = Field.create_generic("f", 2, dt, layout=order)
d = Field.create_generic("d", 2, dt, layout=order)
update_rules = [Assignment(d[0, 0]['e2'], f[0, 0]['e3'])]
arr = np.zeros((3, 2), dtype=dt, order=order)
result = arr.copy(order=order)
arr['e2'] = 0
arr['e3'] = np.random.rand(3, 2)
kernel = create_kernel(update_rules).compile()
kernel(f=arr, d=result)
np.testing.assert_almost_equal(result['e2'], arr['e3'])
np.testing.assert_equal(arr['e2'], np.zeros((3, 2)))
from pystencils import fields, Assignment, AssignmentCollection
from pystencils.simp.subexpression_insertion import *
def test_subexpression_insertion():
f, g = fields('f(10), g(10) : [2D]')
xi = sp.symbols('xi_:10')
xi_set = set(xi)
subexpressions = [
Assignment(xi[0], -f(4)),
Assignment(xi[1], -(f(1) * f(2))),
Assignment(xi[2], 2.31 * f(5)),
Assignment(xi[3], 1.8 + f(5) + f(6)),
Assignment(xi[4], 5.7 + f(6)),
Assignment(xi[5], (f(4) + f(5))**2),
Assignment(xi[6], f(3)**2),
Assignment(xi[7], f(4)),
Assignment(xi[8], 13),
Assignment(xi[9], 0),
]
assignments = [Assignment(g(i), x) for i, x in enumerate(xi)]
ac = AssignmentCollection(assignments, subexpressions=subexpressions)
ac_ins = insert_symbol_times_minus_one(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[0]})
ac_ins = insert_constant_multiples(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[0], xi[2]})
ac_ins = insert_constant_additions(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[4]})
ac_ins = insert_squares(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[6]})
ac_ins = insert_aliases(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[7]})
ac_ins = insert_zeros(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[9]})
ac_ins = insert_constants(ac, skip={xi[9]})
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[8]})
# -*- coding: utf-8 -*-
#
# Copyright © 2019 Stephan Seitz <stephan.seitz@fau.de>
#
# Distributed under terms of the GPLv3 license.
"""
"""
import pytest
import numpy as np
import pystencils.config
import sympy as sp
import sympy.abc
import pystencils as ps
from pystencils.typing import create_type
@pytest.mark.parametrize('dtype', ["float64", "float32"])
def test_sum(dtype):
sum = sp.Sum(sp.abc.k, (sp.abc.k, 1, 100))
expanded_sum = sum.doit()
# print(sum)
# print(expanded_sum)
x = ps.fields(f'x: {dtype}[1d]')
assignments = ps.AssignmentCollection({x.center(): sum})
ast = ps.create_kernel(assignments)
code = ps.get_code_str(ast)
kernel = ast.compile()
# ps.show_code(ast)
if dtype == "float32":
assert "5050.0f;" in code
array = np.zeros((10,), dtype=dtype)
kernel(x=array)
assert np.allclose(array, int(expanded_sum) * np.ones_like(array))
@pytest.mark.parametrize('dtype', ["int32", "int64", "float64", "float32"])
def test_product(dtype):
k = ps.TypedSymbol('k', create_type(dtype))
sum = sympy.Product(k, (k, 1, 10))
expanded_sum = sum.doit()
# print(sum)
# print(expanded_sum)
x = ps.fields(f'x: {dtype}[1d]')
assignments = ps.AssignmentCollection({x.center(): sum})
config = pystencils.config.CreateKernelConfig()
ast = ps.create_kernel(assignments, config=config)
code = ps.get_code_str(ast)
kernel = ast.compile()
# print(code)
if dtype == "int64" or dtype == "int32":
assert '3628800;' in code
elif dtype == "float32":
assert '3628800.0f;' in code
else:
assert '3628800.0;' in code
array = np.zeros((10,), dtype=dtype)
kernel(x=array)
assert np.allclose(array, int(expanded_sum) * np.ones_like(array))
# TODO: See Issue !55
# def test_prod_var_limit():
#
# k = ps.TypedSymbol('k', create_type('int64'))
# limit = ps.TypedSymbol('limit', create_type('int64'))
#
# sum = sympy.Sum(k, (k, 1, limit))
# expanded_sum = sum.replace(limit, 100).doit()
#
# print(sum)
# print(expanded_sum)
#
# x = ps.fields('x: int64[1d]')
#
# assignments = ps.AssignmentCollection({x.center(): sum})
#
# ast = ps.create_kernel(assignments)
# ps.show_code(ast)
# kernel = ast.compile()
#
# array = np.zeros((10,), np.int64)
#
# kernel(x=array, limit=100)
#
# assert np.allclose(array, int(expanded_sum) * np.ones_like(array))
import sympy
import numpy as np
import sympy as sp
import pystencils
from pystencils.sympyextensions import replace_second_order_products
from pystencils.sympyextensions import remove_higher_order_terms
from pystencils.sympyextensions import complete_the_squares_in_exp
from pystencils.sympyextensions import extract_most_common_factor
from pystencils.sympyextensions import simplify_by_equality
from pystencils.sympyextensions import count_operations
from pystencils.sympyextensions import common_denominator
from pystencils.sympyextensions import get_symmetric_part
from pystencils.sympyextensions import scalar_product
from pystencils.sympyextensions import kronecker_delta
from pystencils import Assignment
from pystencils.functions import DivFunc
from pystencils.fast_approximation import (fast_division, fast_inv_sqrt, fast_sqrt,
insert_fast_divisions, insert_fast_sqrts)
def test_utility():
a = [1, 2]
b = (2, 3)
a_np = np.array(a)
b_np = np.array(b)
assert scalar_product(a, b) == np.dot(a_np, b_np)
a = sympy.Symbol("a")
b = sympy.Symbol("b")
assert kronecker_delta(a, a, a, b) == 0
assert kronecker_delta(a, a, a, a) == 1
assert kronecker_delta(3, 3, 3, 2) == 0
assert kronecker_delta(2, 2, 2, 2) == 1
assert kronecker_delta([10] * 100) == 1
assert kronecker_delta((0, 1), (0, 1)) == 1
def test_replace_second_order_products():
x, y = sympy.symbols('x y')
expr = 4 * x * y
expected_expr_positive = 2 * ((x + y) ** 2 - x ** 2 - y ** 2)
expected_expr_negative = 2 * (-(x - y) ** 2 + x ** 2 + y ** 2)
result = replace_second_order_products(expr, search_symbols=[x, y], positive=True)
assert result == expected_expr_positive
assert (result - expected_expr_positive).simplify() == 0
result = replace_second_order_products(expr, search_symbols=[x, y], positive=False)
assert result == expected_expr_negative
assert (result - expected_expr_negative).simplify() == 0
result = replace_second_order_products(expr, search_symbols=[x, y], positive=None)
assert result == expected_expr_positive
a = [Assignment(sympy.symbols('z'), x + y)]
replace_second_order_products(expr, search_symbols=[x, y], positive=True, replace_mixed=a)
assert len(a) == 2
assert replace_second_order_products(4 + y, search_symbols=[x, y]) == y + 4
def test_remove_higher_order_terms():
x, y = sympy.symbols('x y')
expr = sympy.Mul(x, y)
result = remove_higher_order_terms(expr, order=1, symbols=[x, y])
assert result == 0
result = remove_higher_order_terms(expr, order=2, symbols=[x, y])
assert result == expr
expr = sympy.Pow(x, 3)
result = remove_higher_order_terms(expr, order=2, symbols=[x, y])
assert result == 0
result = remove_higher_order_terms(expr, order=3, symbols=[x, y])
assert result == expr
def test_complete_the_squares_in_exp():
a, b, c, s, n = sympy.symbols('a b c s n')
expr = a * s ** 2 + b * s + c
result = complete_the_squares_in_exp(expr, symbols_to_complete=[s])
assert result == expr
expr = sympy.exp(a * s ** 2 + b * s + c)
expected_result = sympy.exp(a*s**2 + c - b**2 / (4*a))
result = complete_the_squares_in_exp(expr, symbols_to_complete=[s])
assert result == expected_result
def test_extract_most_common_factor():
x, y = sympy.symbols('x y')
expr = 1 / (x + y) + 3 / (x + y) + 3 / (x + y)
most_common_factor = extract_most_common_factor(expr)
assert most_common_factor[0] == 7
assert sympy.prod(most_common_factor) == expr
expr = 1 / x + 3 / (x + y) + 3 / y
most_common_factor = extract_most_common_factor(expr)
assert most_common_factor[0] == 3
assert sympy.prod(most_common_factor) == expr
expr = 1 / x
most_common_factor = extract_most_common_factor(expr)
assert most_common_factor[0] == 1
assert sympy.prod(most_common_factor) == expr
assert most_common_factor[1] == expr
def test_count_operations():
x, y, z = sympy.symbols('x y z')
expr = 1/x + y * sympy.sqrt(z)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
assert ops['muls'] == 1
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = 1 / sympy.sqrt(z)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 0
assert ops['muls'] == 0
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = sympy.Rel(1 / sympy.sqrt(z), 5)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 0
assert ops['muls'] == 0
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = sympy.sqrt(x + y)
expr = insert_fast_sqrts(expr).atoms(fast_sqrt)
ops = count_operations(*expr, only_type=None)
assert ops['fast_sqrts'] == 1
expr = sympy.sqrt(x / y)
expr = insert_fast_divisions(expr).atoms(fast_division)
ops = count_operations(*expr, only_type=None)
assert ops['fast_div'] == 1
expr = pystencils.Assignment(sympy.Symbol('tmp'), 3 / sympy.sqrt(x + y))
expr = insert_fast_sqrts(expr).atoms(fast_inv_sqrt)
ops = count_operations(*expr, only_type=None)
assert ops['fast_inv_sqrts'] == 1
expr = sympy.Piecewise((1.0, x > 0), (0.0, True)) + y * z
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
expr = sympy.Pow(1/x + y * sympy.sqrt(z), 100)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
assert ops['muls'] == 99
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = DivFunc(x, y)
ops = count_operations(expr, only_type=None)
assert ops['divs'] == 1
expr = DivFunc(x + z, y + z)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 2
assert ops['divs'] == 1
expr = sp.UnevaluatedExpr(sp.Mul(*[x]*100, evaluate=False))
ops = count_operations(expr, only_type=None)
assert ops['muls'] == 99
expr = DivFunc(1, sp.UnevaluatedExpr(sp.Mul(*[x]*100, evaluate=False)))
ops = count_operations(expr, only_type=None)
assert ops['divs'] == 1
assert ops['muls'] == 99
expr = DivFunc(y + z, sp.UnevaluatedExpr(sp.Mul(*[x]*100, evaluate=False)))
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
assert ops['divs'] == 1
assert ops['muls'] == 99
def test_common_denominator():
x = sympy.symbols('x')
expr = sympy.Rational(1, 2) + x * sympy.Rational(2, 3)
cm = common_denominator(expr)
assert cm == 6
def test_get_symmetric_part():
x, y, z = sympy.symbols('x y z')
expr = x / 9 - y ** 2 / 6 + z ** 2 / 3 + z / 3
expected_result = x / 9 - y ** 2 / 6 + z ** 2 / 3
sym_part = get_symmetric_part(expr, sympy.symbols(f'y z'))
assert sym_part == expected_result
def test_simplify_by_equality():
x, y, z = sp.symbols('x, y, z')
p, q = sp.symbols('p, q')
# Let x = y + z
expr = x * p - y * p + z * q
expr = simplify_by_equality(expr, x, y, z)
assert expr == z * p + z * q
expr = x * (p - 2 * q) + 2 * q * z
expr = simplify_by_equality(expr, x, y, z)
assert expr == x * p - 2 * q * y
expr = x * (y + z) - y * z
expr = simplify_by_equality(expr, x, y, z)
assert expr == x*y + z**2
# Let x = y + 2
expr = x * p - 2 * p
expr = simplify_by_equality(expr, x, y, 2)
assert expr == y * p
import time
import numpy as np
from pystencils import Assignment
from pystencils import create_kernel
from pystencils.datahandling import create_data_handling
from pystencils.timeloop import TimeLoop
def test_timeloop():
dh = create_data_handling(domain_size=(2, 2), periodicity=True)
pre = dh.add_array('pre_run_field', values_per_cell=1)
dh.fill("pre_run_field", 0.0, ghost_layers=True)
f = dh.add_array('field', values_per_cell=1)
dh.fill("field", 0.0, ghost_layers=True)
post = dh.add_array('post_run_field', values_per_cell=1)
dh.fill("post_run_field", 0.0, ghost_layers=True)
single_step = dh.add_array('single_step_field', values_per_cell=1)
dh.fill("single_step_field", 0.0, ghost_layers=True)
pre_assignments = Assignment(pre.center, pre.center + 1)
pre_kernel = create_kernel(pre_assignments).compile()
assignments = Assignment(f.center, f.center + 1)
kernel = create_kernel(assignments).compile()
post_assignments = Assignment(post.center, post.center + 1)
post_kernel = create_kernel(post_assignments).compile()
single_step_assignments = Assignment(single_step.center, single_step.center + 1)
single_step_kernel = create_kernel(single_step_assignments).compile()
fixed_steps = 2
timeloop = TimeLoop(steps=fixed_steps)
assert timeloop.fixed_steps == fixed_steps
def pre_run():
dh.run_kernel(pre_kernel)
def post_run():
dh.run_kernel(post_kernel)
def single_step_run():
dh.run_kernel(single_step_kernel)
timeloop.add_pre_run_function(pre_run)
timeloop.add_post_run_function(post_run)
timeloop.add_single_step_function(single_step_run)
timeloop.add_call(kernel, {'field': dh.cpu_arrays["field"]})
# the timeloop is initialised with 2 steps. This means a single time step consists of two steps.
# Therefore, we have 2 main iterations and one single step iteration in this configuration
timeloop.run(time_steps=5)
assert np.all(dh.cpu_arrays["pre_run_field"] == 1.0)
assert np.all(dh.cpu_arrays["field"] == 2.0)
assert np.all(dh.cpu_arrays["single_step_field"] == 1.0)
assert np.all(dh.cpu_arrays["post_run_field"] == 1.0)
seconds = 2
start = time.perf_counter()
timeloop.run_time_span(seconds=seconds)
end = time.perf_counter()
# This test case fails often due to time measurements. It is not a good idea to assert here
# np.testing.assert_almost_equal(seconds, end - start, decimal=2)
print("timeloop: ", seconds, " own meassurement: ", end - start)
import sympy as sp
import numpy as np
import pystencils as ps
from pystencils import fields, TypedSymbol
from pystencils.astnodes import LoopOverCoordinate, SympyAssignment
from pystencils.typing import create_type
from pystencils.transformations import (
filtered_tree_iteration, get_loop_hierarchy, get_loop_counter_symbol_hierarchy,
iterate_loops_by_depth, split_inner_loop, loop_blocking
)
from pystencils.cpu import add_pragmas
def test_loop_information():
f, g = ps.fields("f, g: double[2D]")
update_rule = ps.Assignment(g[0, 0], f[0, 0])
ast = ps.create_kernel(update_rule)
inner_loops = [loop for loop in filtered_tree_iteration(ast, LoopOverCoordinate, stop_type=SympyAssignment)
if loop.is_innermost_loop]
loop_order = []
for i in get_loop_hierarchy(inner_loops[0].args[0]):
loop_order.append(i)
assert loop_order == [0, 1]
loop_symbols = get_loop_counter_symbol_hierarchy(inner_loops[0].args[0])
assert loop_symbols == [TypedSymbol("ctr_1", create_type("int"), nonnegative=True),
TypedSymbol("ctr_0", create_type("int"), nonnegative=True)]
def test_iterate_loops_by_depth():
f, g = ps.fields("f, g: double[3D]", layout="fzyx")
x = ps.TypedSymbol('x', np.float64)
subs = [ps.Assignment(x, f[0, 0, 0])]
mains = [ps.Assignment(g[0, 0, 0], x)]
ac = ps.AssignmentCollection(mains, subexpressions=subs)
config = ps.CreateKernelConfig(cpu_blocking=(0, 16, 0))
ast = ps.create_kernel(ac, config=config)
split_inner_loop(ast, [[x], [g[0,0,0]]])
loops = list(iterate_loops_by_depth(ast, 0))
assert len(loops) == 1
assert loops[0].loop_counter_symbol.name == "_blockctr_1"
loops = list(iterate_loops_by_depth(ast, 1))
assert len(loops) == 1
assert loops[0].loop_counter_symbol.name == "ctr_2"
loops = list(iterate_loops_by_depth(ast, 2))
assert len(loops) == 1
assert loops[0].loop_counter_symbol.name == "ctr_1"
loops = list(iterate_loops_by_depth(ast, 3))
assert len(loops) == 2
assert loops[0].loop_counter_symbol.name == "ctr_0"
assert loops[1].loop_counter_symbol.name == "ctr_0"
innermost = list(iterate_loops_by_depth(ast, -1))
assert loops == innermost
def test_split_optimisation():
src, dst = fields(f"src(9), dst(9): [2D]", layout='fzyx')
stencil = ((0, 0), (0, 1), (0, -1), (-1, 0), (1, 0), (-1, 1), (1, 1), (-1, -1), (1, -1))
w = [sp.Rational(4, 9)]
w += [sp.Rational(1, 9)] * 4
w += [sp.Rational(1, 36)] * 4
cs = sp.Rational(1, 3)
subexpressions = []
main_assignements = []
rho = sp.symbols("rho")
velo = sp.symbols("u_:2")
density = 0
velocity_x = 0
velocity_y = 0
for d in stencil:
density += src[d]
velocity_x += d[0] * src[d]
velocity_y += d[1] * src[d]
subexpressions.append(ps.Assignment(rho, density))
subexpressions.append(ps.Assignment(velo[0], velocity_x))
subexpressions.append(ps.Assignment(velo[1], velocity_y))
for i, d in enumerate(stencil):
u_d = velo[0] * d[0] + velo[1] * d[1]
u_2 = velo[0] * velo[0] + velo[1] * velo[1]
expr = w[i] * rho * (1 + u_d / cs + u_d ** 2 / (2 * cs ** 2) - u_2 / (2 * cs))
main_assignements.append(ps.Assignment(dst.center_vector[i], expr))
ac = ps.AssignmentCollection(main_assignments=main_assignements, subexpressions=subexpressions)
simplification_hint = {'density': rho,
'velocity': (velo[0], velo[1]),
'split_groups': [[rho, velo[0], velo[1], dst.center_vector[0]],
[dst.center_vector[1], dst.center_vector[2]],
[dst.center_vector[3], dst.center_vector[4]],
[dst.center_vector[5], dst.center_vector[6]],
[dst.center_vector[7], dst.center_vector[8]]]}
ac.simplification_hints = simplification_hint
ast = ps.create_kernel(ac)
code = ps.get_code_str(ast)
# after the split optimisation the two for loops are split into 6
assert code.count("for") == 6
print(code)
def test_pragmas():
f, g = ps.fields("f, g: double[3D]", layout="fzyx")
x = ps.TypedSymbol('x', np.float64)
subs = [ps.Assignment(x, f[0, 0, 0])]
mains = [ps.Assignment(g[0, 0, 0], x)]
ac = ps.AssignmentCollection(mains, subexpressions=subs)
def prepend_omp_pragmas(ast):
add_pragmas(ast, ["#pragma omp for schedule(dynamic)"], nesting_depth=0)
add_pragmas(ast, ["#pragma omp simd simdlen(8)"], nesting_depth=-1)
ast_passes = [prepend_omp_pragmas]
config = ps.CreateKernelConfig(target=ps.Target.CPU, cpu_prepend_optimizations=ast_passes)
ast = ps.create_kernel(ac, config=config)
code = ps.get_code_str(ast)
assert code.find("#pragma omp for schedule(dynamic)") != -1
assert code.find("#pragma omp simd simdlen(8)") != -1
loops = [loop for loop in filtered_tree_iteration(ast, LoopOverCoordinate, stop_type=SympyAssignment)]
innermost = list(filter(lambda n: n.is_innermost_loop, loops))
assert innermost[0].prefix_lines == ["#pragma omp simd simdlen(8)"]
outermost = list(filter(lambda n: n.is_outermost_loop, loops))
assert outermost[0].prefix_lines == ["#pragma omp for schedule(dynamic)"]
from sympy.abc import a, b, c, d, e, f, g
import pystencils
from pystencils.typing import CastFunc, create_type
def test_type_interference():
x = pystencils.fields('x: float32[3d]')
assignments = pystencils.AssignmentCollection({
a: CastFunc(10, create_type('float64')),
b: CastFunc(10, create_type('uint16')),
e: 11,
c: b,
f: c + b,
d: c + b + x.center + e,
x.center: c + b + x.center,
g: a + b + d
})
ast = pystencils.create_kernel(assignments)
code = pystencils.get_code_str(ast)
# print(code)
assert 'const double a' in code
assert 'const uint16_t b' in code
assert 'const uint16_t f' in code
assert 'const int64_t e' in code
assert 'const float d = ((float)(b)) + ((float)(c)) + ((float)(e)) + ' \
'_data_x[_stride_x_0*ctr_0 + _stride_x_1*ctr_1 + _stride_x_2*ctr_2];' in code
assert '_data_x[_stride_x_0*ctr_0 + _stride_x_1*ctr_1 + _stride_x_2*ctr_2] = (' \
'(float)(b)) + ((float)(c)) + _data_x[_stride_x_0*ctr_0 + _stride_x_1*ctr_1 + _stride_x_2*ctr_2];' in code
assert 'const double g = a + ((double)(b)) + ((double)(d));' in code
import pytest
import pystencils.config
import sympy as sp
import numpy as np
import pystencils as ps
from pystencils.typing import TypedSymbol, get_type_of_expression, VectorType, collate_types, \
typed_symbols, CastFunc, PointerArithmeticFunc, PointerType, result_type, BasicType
def test_result_type():
i = np.dtype('int32')
l = np.dtype('int64')
ui = np.dtype('uint32')
ul = np.dtype('uint64')
f = np.dtype('float32')
d = np.dtype('float64')
b = np.dtype('bool')
assert result_type(i, l) == l
assert result_type(l, i) == l
assert result_type(ui, i) == i
assert result_type(ui, l) == l
assert result_type(ul, i) == i
assert result_type(ul, l) == l
assert result_type(d, f) == d
assert result_type(f, d) == d
assert result_type(i, f) == f
assert result_type(l, f) == f
assert result_type(ui, f) == f
assert result_type(ul, f) == f
assert result_type(i, d) == d
assert result_type(l, d) == d
assert result_type(ui, d) == d
assert result_type(ul, d) == d
assert result_type(b, i) == i
assert result_type(b, l) == l
assert result_type(b, ui) == ui
assert result_type(b, ul) == ul
assert result_type(b, f) == f
assert result_type(b, d) == d
@pytest.mark.parametrize('dtype', ('float64', 'float32', 'int64', 'int32', 'uint32', 'uint64'))
def test_simple_add(dtype):
constant = 1.0
if dtype[0] in 'ui':
constant = 1
f = ps.fields(f"f: {dtype}[1D]")
d = TypedSymbol("d", dtype)
test_arr = np.array([constant], dtype=dtype)
ur = ps.Assignment(f[0], f[0] + d)
ast = ps.create_kernel(ur)
code = ps.get_code_str(ast)
kernel = ast.compile()
kernel(f=test_arr, d=constant)
assert test_arr[0] == constant+constant
@pytest.mark.parametrize('dtype1', ('float64', 'float32', 'int64', 'int32', 'uint32', 'uint64'))
@pytest.mark.parametrize('dtype2', ('float64', 'float32', 'int64', 'int32', 'uint32', 'uint64'))
def test_mixed_add(dtype1, dtype2):
constant = 1
f = ps.fields(f"f: {dtype1}[1D]")
g = ps.fields(f"g: {dtype2}[1D]")
test_f = np.array([constant], dtype=dtype1)
test_g = np.array([constant], dtype=dtype2)
ur = ps.Assignment(f[0], f[0] + g[0])
# TODO Markus: check for the logging if colate_types(dtype1, dtype2) != dtype1
ast = ps.create_kernel(ur)
code = ps.get_code_str(ast)
kernel = ast.compile()
kernel(f=test_f, g=test_g)
assert test_f[0] == constant+constant
def test_collation():
double_type = BasicType('float64')
float_type = BasicType('float32')
double4_type = VectorType(double_type, 4)
float4_type = VectorType(float_type, 4)
assert collate_types([double_type, float_type]) == double_type
assert collate_types([double4_type, float_type]) == double4_type
assert collate_types([double4_type, float4_type]) == double4_type
def test_vector_type():
double_type = BasicType('float64')
float_type = BasicType('float32')
double4_type = VectorType(double_type, 4)
float4_type = VectorType(float_type, 4)
assert double4_type.item_size == 4
assert float4_type.item_size == 4
double4_type2 = VectorType(double_type, 4)
assert double4_type == double4_type2
assert double4_type != 4
assert double4_type != float4_type
def test_pointer_type():
double_type = BasicType('float64')
float_type = BasicType('float32')
double4_type = PointerType(double_type, restrict=True)
float4_type = PointerType(float_type, restrict=False)
assert double4_type.item_size == 1
assert float4_type.item_size == 1
assert not double4_type == 4
assert not double4_type.alias
assert float4_type.alias
def test_dtype_of_constants():
# Some come constants are neither of type Integer,Float,Rational and don't have args
# >>> isinstance(pi, Integer)
# False
# >>> isinstance(pi, Float)
# False
# >>> isinstance(pi, Rational)
# False
# >>> pi.args
# ()
get_type_of_expression(sp.pi)
def test_assumptions():
x = ps.fields('x: float32[3d]')
assert x.shape[0].is_nonnegative
assert (2 * x.shape[0]).is_nonnegative
assert (2 * x.shape[0]).is_integer
assert (TypedSymbol('a', BasicType('uint64'))).is_nonnegative
assert (TypedSymbol('a', BasicType('uint64'))).is_positive is None
assert (TypedSymbol('a', BasicType('uint64')) + 1).is_positive
assert (x.shape[0] + 1).is_real
@pytest.mark.parametrize('dtype', ('float64', 'float32'))
def test_sqrt_of_integer(dtype):
"""Regression test for bug where sqrt(3) was classified as integer"""
f = ps.fields(f'f: {dtype}[1D]')
tmp = sp.symbols('tmp')
assignments = [ps.Assignment(tmp, sp.sqrt(3)),
ps.Assignment(f[0], tmp)]
arr = np.array([1], dtype=dtype)
config = pystencils.config.CreateKernelConfig(data_type=dtype, default_number_float=dtype)
ast = ps.create_kernel(assignments, config=config)
kernel = ast.compile()
kernel(f=arr)
assert 1.7 < arr[0] < 1.8
code = ps.get_code_str(ast)
constant = '1.7320508075688772f'
if dtype == 'float32':
assert constant in code
else:
assert constant not in code
@pytest.mark.parametrize('dtype', ('float64', 'float32'))
def test_integer_comparision(dtype):
f = ps.fields(f"f: {dtype}[2D]")
d = TypedSymbol("dir", "int64")
ur = ps.Assignment(f[0, 0], sp.Piecewise((0, sp.Equality(d, 1)), (f[0, 0], True)))
ast = ps.create_kernel(ur)
code = ps.get_code_str(ast)
# There should be an explicit cast for the integer zero to the type of the field on the rhs
if dtype == 'float64':
t = "_data_f[_stride_f_0*ctr_0 + _stride_f_1*ctr_1] = " \
"((((dir) == (1))) ? (0.0): (_data_f[_stride_f_0*ctr_0 + _stride_f_1*ctr_1]));"
else:
t = "_data_f[_stride_f_0*ctr_0 + _stride_f_1*ctr_1] = " \
"((((dir) == (1))) ? (0.0f): (_data_f[_stride_f_0*ctr_0 + _stride_f_1*ctr_1]));"
assert t in code
def test_typed_symbols_dtype():
assert typed_symbols(("s", "f"), np.uint) == typed_symbols("s, f", np.uint)
t_symbols = typed_symbols(("s", "f"), np.uint)
s = t_symbols[0]
assert t_symbols[0] == TypedSymbol("s", np.uint)
assert s.dtype.is_uint()
assert typed_symbols("s", np.float64).dtype.c_name == 'double'
assert typed_symbols("s", np.float32).dtype.c_name == 'float'
assert TypedSymbol("s", np.uint).canonical == TypedSymbol("s", np.uint)
assert TypedSymbol("s", np.uint).reversed == TypedSymbol("s", np.uint)
def test_cast_func():
assert CastFunc(TypedSymbol("s", np.uint), np.int64).canonical == TypedSymbol("s", np.uint).canonical
a = CastFunc(5, np.uint)
assert a.is_negative is False
assert a.is_nonnegative
def test_pointer_arithmetic_func():
assert PointerArithmeticFunc(TypedSymbol("s", np.uint), 1).canonical == TypedSymbol("s", np.uint).canonical
def test_division():
f = ps.fields('f(10): float32[2D]')
m, tau = sp.symbols("m, tau")
up = [ps.Assignment(tau, 1 / (0.5 + (3.0 * m))),
ps.Assignment(f.center, tau)]
config = pystencils.config.CreateKernelConfig(data_type='float32', default_number_float='float32')
ast = ps.create_kernel(up, config=config)
code = ps.get_code_str(ast)
assert "((1.0f) / (m*3.0f + 0.5f))" in code
def test_pow():
f = ps.fields('f(10): float32[2D]')
m, tau = sp.symbols("m, tau")
up = [ps.Assignment(tau, m ** 1.5),
ps.Assignment(f.center, tau)]
config = pystencils.config.CreateKernelConfig(data_type="float32", default_number_float='float32')
ast = ps.create_kernel(up, config=config)
code = ps.get_code_str(ast)
assert "1.5f" in code