Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 1771 additions and 16 deletions
import os import os
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
import shutil
import pytest
import numpy as np import numpy as np
import pystencils.plot as plt import pystencils.plot as plt
...@@ -18,6 +23,7 @@ def example_vector_field(t=0, shape=(40, 40)): ...@@ -18,6 +23,7 @@ def example_vector_field(t=0, shape=(40, 40)):
return result return result
@pytest.mark.skipif(shutil.which('ffmpeg') is None, reason="ffmpeg not available")
def test_animation(): def test_animation():
t = 0 t = 0
......
import pytest
import re
import sympy as sp
import pystencils
from pystencils.backends.cbackend import CBackend
class UnsupportedNode(pystencils.astnodes.Node):
def __init__(self):
super().__init__()
@pytest.mark.parametrize('type', ('float32', 'float64', 'int64'))
@pytest.mark.parametrize('negative', (False, 'Negative'))
@pytest.mark.parametrize('target', (pystencils.Target.CPU, pystencils.Target.GPU))
def test_print_infinity(type, negative, target):
x = pystencils.fields(f'x: {type}[1d]')
if negative:
assignment = pystencils.Assignment(x.center, -sp.oo)
else:
assignment = pystencils.Assignment(x.center, sp.oo)
ast = pystencils.create_kernel(assignment, data_type=type, target=target)
if target == pystencils.Target.GPU:
pytest.importorskip('cupy')
ast.compile()
print(ast.compile().code)
def test_print_unsupported_node():
with pytest.raises(NotImplementedError, match='CBackend does not support node of type UnsupportedNode'):
CBackend()(UnsupportedNode())
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('target', (pystencils.Target.CPU, pystencils.Target.GPU))
def test_print_subtraction(dtype, target):
a, b, c = sp.symbols("a b c")
x = pystencils.fields(f'x: {dtype}[3d]')
y = pystencils.fields(f'y: {dtype}[3d]')
config = pystencils.CreateKernelConfig(target=target, data_type=dtype)
update = pystencils.Assignment(x.center, y.center - a * b ** 8 + b * -1 / 42.0 - 2 * c ** 4)
ast = pystencils.create_kernel(update, config=config)
code = pystencils.get_code_str(ast)
assert "-1.0" not in code
def test_print_small_integer_pow():
printer = pystencils.backends.cbackend.CBackend()
x = sp.Symbol("x")
y = sp.Symbol("y")
n = pystencils.TypedSymbol("n", "int")
t = pystencils.TypedSymbol("t", "float32")
s = pystencils.TypedSymbol("s", "float32")
equs = [
pystencils.astnodes.SympyAssignment(y, 1/x),
pystencils.astnodes.SympyAssignment(y, x*x),
pystencils.astnodes.SympyAssignment(y, 1/(x*x)),
pystencils.astnodes.SympyAssignment(y, x**8),
pystencils.astnodes.SympyAssignment(y, x**(-8)),
pystencils.astnodes.SympyAssignment(y, x**9),
pystencils.astnodes.SympyAssignment(y, x**(-9)),
pystencils.astnodes.SympyAssignment(y, x**n),
pystencils.astnodes.SympyAssignment(y, sp.Pow(4, 4, evaluate=False)),
pystencils.astnodes.SympyAssignment(y, x**0.25),
pystencils.astnodes.SympyAssignment(y, x**y),
pystencils.astnodes.SympyAssignment(y, pystencils.typing.cast_functions.CastFunc(1/x, "float32")),
pystencils.astnodes.SympyAssignment(y, pystencils.typing.cast_functions.CastFunc(x*x, "float32")),
pystencils.astnodes.SympyAssignment(y, (t+s)**(-8)),
pystencils.astnodes.SympyAssignment(y, (t+s)**(-9)),
]
typed = pystencils.typing.transformations.add_types(equs, pystencils.CreateKernelConfig())
regexes = [
r"1\.0\s*/\s*\(?\s*x\s*\)?",
r"x\s*\*\s*x",
r"1\.0\s*/\s*\(\s*x\s*\*x\s*\)",
r"x(\s*\*\s*x){7}",
r"1\.0\s*/\s*\(\s*x(\s*\*\s*x){7}\s*\)",
r"pow\(\s*x\s*,\s*9(\.0)?\s*\)",
r"pow\(\s*x\s*,\s*-9(\.0)?\s*\)",
r"pow\(\s*x\s*,\s*\(?\s*\(\s*double\s*\)\s*\(\s*n\s*\)\s*\)?\s*\)",
r"\(\s*int[a-zA-Z0-9_]*\s*\)\s*\(+\s*4(\s*\*\s*4){3}\s*\)+",
r"pow\(\s*x\s*,\s*0\.25\s*\)",
r"pow\(\s*x\s*,\s*y\s*\)",
r"\(\s*float\s*\)[ ()]*1\.0\s*/\s*\(?\s*x\s*\)?",
r"\(\s*float\s*\)[ ()]*x\s*\*\s*x",
r"\(\s*float\s*\)\s*\(\s*1\.0f\s*/\s*\(\s*\(\s*s\s*\+\s*t\s*\)(\s*\*\s*\(\s*s\s*\+\s*t\s*\)){7}\s*\)",
r"powf\(\s*s\s*\+\s*t\s*,\s*-9\.0f\s*\)",
]
for r, e in zip(regexes, typed):
assert re.search(r, printer(e))
import numpy as np
import pystencils as ps
from pystencils.cpu.vectorization import get_supported_instruction_sets
from pystencils.cpu.vectorization import replace_inner_stride_with_one, vectorize
def test_basic_kernel():
for domain_shape in [(4, 5), (3, 4, 5)]:
dh = ps.create_data_handling(domain_size=domain_shape, periodicity=True)
assert all(dh.periodicity)
f = dh.add_array('f', values_per_cell=1)
tmp = dh.add_array('tmp', values_per_cell=1)
stencil_2d = [(1, 0), (-1, 0), (0, 1), (0, -1)]
stencil_3d = [(1, 0, 0), (-1, 0, 0), (0, 1, 0), (0, -1, 0), (0, 0, 1), (0, 0, -1)]
stencil = stencil_2d if dh.dim == 2 else stencil_3d
jacobi = ps.Assignment(tmp.center, sum(f.neighbors(stencil)) / len(stencil))
kernel = ps.create_kernel(jacobi).compile()
for b in dh.iterate(ghost_layers=1):
b['f'].fill(42)
dh.run_kernel(kernel)
for b in dh.iterate(ghost_layers=0):
np.testing.assert_equal(b['f'], 42)
float_seq = [1.0, 2.0, 3.0, 4.0]
int_seq = [1, 2, 3]
for op in ('min', 'max', 'sum'):
assert (dh.reduce_float_sequence(float_seq, op) == float_seq).all()
assert (dh.reduce_int_sequence(int_seq, op) == int_seq).all()
def test_basic_blocking_staggered():
f = ps.fields("f: double[2D]")
stag = ps.fields("stag(2): double[2D]", field_type=ps.FieldType.STAGGERED)
terms = [
f[0, 0] - f[-1, 0],
f[0, 0] - f[0, -1],
]
assignments = [ps.Assignment(stag.staggered_access(d), terms[i]) for i, d in enumerate(stag.staggered_stencil)]
kernel = ps.create_staggered_kernel(assignments, cpu_blocking=(3, 16)).compile()
reference_kernel = ps.create_staggered_kernel(assignments).compile()
f_arr = np.random.rand(80, 33)
stag_arr = np.zeros((80, 33, 3))
stag_ref = np.zeros((80, 33, 3))
kernel(f=f_arr, stag=stag_arr)
reference_kernel(f=f_arr, stag=stag_ref)
np.testing.assert_almost_equal(stag_arr, stag_ref)
def test_basic_vectorization():
supported_instruction_sets = get_supported_instruction_sets()
if supported_instruction_sets:
instruction_set = supported_instruction_sets[-1]
else:
instruction_set = None
f, g = ps.fields("f, g : double[2D]")
update_rule = [ps.Assignment(g[0, 0], f[0, 0] + f[-1, 0] + f[1, 0] + f[0, 1] + f[0, -1] + 42.0)]
ast = ps.create_kernel(update_rule)
replace_inner_stride_with_one(ast)
vectorize(ast, instruction_set=instruction_set)
func = ast.compile()
arr = np.ones((23 + 2, 17 + 2)) * 5.0
dst = np.zeros_like(arr)
func(g=dst, f=arr)
np.testing.assert_equal(dst[1:-1, 1:-1], 5 * 5.0 + 42.0)
\ No newline at end of file
import numpy as np
import pytest
import pystencils as ps
from pystencils.astnodes import SympyAssignment
from pystencils.node_collection import NodeCollection
from pystencils.rng import PhiloxFourFloats, PhiloxTwoDoubles, AESNIFourFloats, AESNITwoDoubles, random_symbol
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets
from pystencils.cpu.cpujit import get_compiler_config
from pystencils.typing import TypedSymbol
from pystencils.enums import Target
RNGs = {('philox', 'float'): PhiloxFourFloats, ('philox', 'double'): PhiloxTwoDoubles,
('aesni', 'float'): AESNIFourFloats, ('aesni', 'double'): AESNITwoDoubles}
instruction_sets = get_supported_instruction_sets()
if get_compiler_config()['os'] == 'windows':
# skip instruction sets supported by the CPU but not by the compiler
if 'avx' in instruction_sets and ('/arch:avx2' not in get_compiler_config()['flags'].lower()
and '/arch:avx512' not in get_compiler_config()['flags'].lower()):
instruction_sets.remove('avx')
if 'avx512' in instruction_sets and '/arch:avx512' not in get_compiler_config()['flags'].lower():
instruction_sets.remove('avx512')
if 'avx512vl' in instruction_sets and '/arch:avx512' not in get_compiler_config()['flags'].lower():
instruction_sets.remove('avx512vl')
@pytest.mark.parametrize('target, rng', ((Target.CPU, 'philox'), (Target.CPU, 'aesni'), (Target.GPU, 'philox')))
@pytest.mark.parametrize('precision', ('float', 'double'))
@pytest.mark.parametrize('dtype', ('float', 'double'))
def test_rng(target, rng, precision, dtype, t=124, offsets=(0, 0), keys=(0, 0), offset_values=None):
if target == Target.GPU:
pytest.importorskip('cupy')
if instruction_sets and {'neon', 'sve', 'sve2', 'sme', 'vsx', 'rvv'}.intersection(instruction_sets) and rng == 'aesni':
pytest.xfail('AES not yet implemented for this architecture')
if rng == 'aesni' and len(keys) == 2:
keys *= 2
if offset_values is None:
offset_values = offsets
dh = ps.create_data_handling((2, 2), default_ghost_layers=0, default_target=target)
f = dh.add_array("f", values_per_cell=4 if precision == 'float' else 2,
dtype=np.float32 if dtype == 'float' else np.float64)
dh.fill(f.name, 42.0)
rng_node = RNGs[(rng, precision)](dh.dim, offsets=offsets, keys=keys)
assignments = [rng_node] + [SympyAssignment(f(i), s) for i, s in enumerate(rng_node.result_symbols)]
kernel = ps.create_kernel(assignments, target=dh.default_target).compile()
dh.all_to_gpu()
kwargs = {'time_step': t}
if offset_values != offsets:
kwargs.update({k.name: v for k, v in zip(offsets, offset_values)})
dh.run_kernel(kernel, **kwargs)
dh.all_to_cpu()
arr = dh.gather_array(f.name)
assert np.logical_and(arr <= 1.0, arr >= 0).all()
if rng == 'philox' and t == 124 and offsets == (0, 0) and keys == (0, 0) and dh.shape == (2, 2):
int_reference = np.array([[[3576608082, 1252663339, 1987745383, 348040302],
[1032407765, 970978240, 2217005168, 2424826293]],
[[2958765206, 3725192638, 2623672781, 1373196132],
[850605163, 1694561295, 3285694973, 2799652583]]])
else:
pytest.importorskip('randomgen')
if rng == 'aesni':
from randomgen import AESCounter
int_reference = np.empty(dh.shape + (4,), dtype=int)
for x in range(dh.shape[0]):
for y in range(dh.shape[1]):
r = AESCounter(counter=t + (x + offset_values[0]) * 2 ** 32 + (y + offset_values[1]) * 2 ** 64,
key=keys[0] + keys[1] * 2 ** 32 + keys[2] * 2 ** 64 + keys[3] * 2 ** 96,
mode="sequence")
a, b = r.random_raw(size=2)
int_reference[x, y, :] = [a % 2 ** 32, a // 2 ** 32, b % 2 ** 32, b // 2 ** 32]
else:
from randomgen import Philox
int_reference = np.empty(dh.shape + (4,), dtype=int)
for x in range(dh.shape[0]):
for y in range(dh.shape[1]):
r = Philox(counter=t + (x + offset_values[0]) * 2 ** 32 + (y + offset_values[1]) * 2 ** 64 - 1,
key=keys[0] + keys[1] * 2 ** 32, number=4, width=32, mode="sequence")
int_reference[x, y, :] = r.random_raw(size=4)
if precision == 'float' or dtype == 'float':
eps = np.finfo(np.float32).eps
else:
eps = np.finfo(np.float64).eps
if rng == 'aesni': # precision appears to be slightly worse
eps = max(1e-12, 2 * eps)
if precision == 'float':
reference = int_reference * 2. ** -32 + 2. ** -33
else:
x = int_reference[:, :, 0::2]
y = int_reference[:, :, 1::2]
z = x ^ y << (53 - 32)
reference = z * 2. ** -53 + 2. ** -54
assert np.allclose(arr, reference, rtol=0, atol=eps)
@pytest.mark.parametrize('vectorized', (False, True))
@pytest.mark.parametrize('kind', ('value', 'symbol'))
def test_rng_offsets(kind, vectorized):
if vectorized:
test = test_rng_vectorized
if not instruction_sets:
pytest.skip("cannot detect CPU instruction set")
else:
test = test_rng
if kind == 'value':
test(instruction_sets[-1] if vectorized else Target.CPU, 'philox', 'float', 'float', t=8,
offsets=(6, 7), keys=(5, 309))
elif kind == 'symbol':
offsets = (TypedSymbol("x0", np.uint32), TypedSymbol("y0", np.uint32))
test(instruction_sets[-1] if vectorized else Target.GPU, 'philox', 'float', 'float', t=8,
offsets=offsets, offset_values=(6, 7), keys=(5, 309))
@pytest.mark.parametrize('target', instruction_sets)
@pytest.mark.parametrize('rng', ('philox', 'aesni'))
@pytest.mark.parametrize('precision,dtype', (('float', 'float'), ('double', 'double')))
def test_rng_vectorized(target, rng, precision, dtype, t=130, offsets=(1, 3), keys=(0, 0), offset_values=None):
if (target in ['neon', 'vsx', 'rvv', 'sme'] or target.startswith('sve')) and rng == 'aesni':
pytest.xfail('AES not yet implemented for this architecture')
cpu_vectorize_info = {'assume_inner_stride_one': True, 'assume_aligned': True, 'instruction_set': target}
dh = ps.create_data_handling((131, 131), default_ghost_layers=0, default_target=Target.CPU)
f = dh.add_array("f", values_per_cell=4 if precision == 'float' else 2,
dtype=np.float32 if dtype == 'float' else np.float64, alignment=True)
dh.fill(f.name, 42.0)
ref = dh.add_array("ref", values_per_cell=4 if precision == 'float' else 2)
rng_node = RNGs[(rng, precision)](dh.dim, offsets=offsets)
assignments = [rng_node] + [SympyAssignment(ref(i), s) for i, s in enumerate(rng_node.result_symbols)]
kernel = ps.create_kernel(assignments, target=dh.default_target).compile()
kwargs = {'time_step': t}
if offset_values is not None:
kwargs.update({k.name: v for k, v in zip(offsets, offset_values)})
dh.run_kernel(kernel, **kwargs)
rng_node = RNGs[(rng, precision)](dh.dim, offsets=offsets)
assignments = [rng_node] + [SympyAssignment(f(i), s) for i, s in enumerate(rng_node.result_symbols)]
kernel = ps.create_kernel(assignments, target=dh.default_target, cpu_vectorize_info=cpu_vectorize_info).compile()
dh.run_kernel(kernel, **kwargs)
ref_data = dh.gather_array(ref.name)
data = dh.gather_array(f.name)
assert np.allclose(ref_data, data)
@pytest.mark.parametrize('vectorized', (False, True))
def test_rng_symbol(vectorized):
"""Make sure that the RNG symbol generator generates symbols and that the resulting code compiles"""
cpu_vectorize_info = None
if vectorized:
if not instruction_sets:
pytest.skip("cannot detect CPU instruction set")
else:
cpu_vectorize_info = {'assume_inner_stride_one': True, 'assume_aligned': True,
'instruction_set': instruction_sets[-1]}
dh = ps.create_data_handling((8, 8), default_ghost_layers=0, default_target=Target.CPU)
f = dh.add_array("f", values_per_cell=2 * dh.dim, alignment=True)
nc = NodeCollection([SympyAssignment(f(i), 0) for i in range(f.shape[-1])])
subexpressions = []
rng_symbol_gen = random_symbol(subexpressions, dim=dh.dim)
for i in range(f.shape[-1]):
nc.all_assignments[i] = SympyAssignment(nc.all_assignments[i].lhs, next(rng_symbol_gen))
symbols = [a.rhs for a in nc.all_assignments]
[nc.all_assignments.insert(0, subexpression) for subexpression in subexpressions]
assert len(symbols) == f.shape[-1] and len(set(symbols)) == f.shape[-1]
ps.create_kernel(nc, target=dh.default_target, cpu_vectorize_info=cpu_vectorize_info).compile()
@pytest.mark.parametrize('vectorized', (False, True))
def test_staggered(vectorized):
"""Make sure that the RNG counter can be substituted during loop cutting"""
dh = ps.create_data_handling((8, 8), default_ghost_layers=0, default_target=Target.CPU)
j = dh.add_array("j", values_per_cell=dh.dim, field_type=ps.FieldType.STAGGERED_FLUX)
a = ps.AssignmentCollection([ps.Assignment(j.staggered_access(n), 0) for n in j.staggered_stencil])
rng_symbol_gen = random_symbol(a.subexpressions, dim=dh.dim, rng_node=PhiloxTwoDoubles)
a.main_assignments[0] = ps.Assignment(a.main_assignments[0].lhs, next(rng_symbol_gen))
kernel = ps.create_staggered_kernel(a, target=dh.default_target).compile()
if not vectorized:
return
if not instruction_sets:
pytest.skip("cannot detect CPU instruction set")
pytest.importorskip('islpy')
cpu_vectorize_info = {'assume_inner_stride_one': True, 'assume_aligned': False,
'instruction_set': instruction_sets[-1]}
dh.fill(j.name, 867)
dh.run_kernel(kernel, seed=5, time_step=309)
ref_data = dh.gather_array(j.name)
kernel2 = ps.create_staggered_kernel(a, target=dh.default_target, cpu_vectorize_info=cpu_vectorize_info).compile()
dh.fill(j.name, 867)
dh.run_kernel(kernel2, seed=5, time_step=309)
data = dh.gather_array(j.name)
assert np.allclose(ref_data, data)
from pystencils.cache import sharedmethodcache
class Fib:
def __init__(self):
self.fib_rec_called = 0
self.fib_iter_called = 0
@sharedmethodcache("fib_cache")
def fib_rec(self, n):
self.fib_rec_called += 1
return 1 if n <= 1 else self.fib_rec(n-1) + self.fib_rec(n-2)
@sharedmethodcache("fib_cache")
def fib_iter(self, n):
self.fib_iter_called += 1
f1, f2 = 0, 1
for i in range(n):
f2 = f1 + f2
f1 = f2 - f1
return f2
def test_fib_memoization_1():
fib = Fib()
assert "fib_cache" not in fib.__dict__
f13 = fib.fib_rec(13)
assert fib.fib_rec_called == 14
assert "fib_cache" in fib.__dict__
assert fib.fib_cache[(13,)] == f13
for k in range(14):
# fib_iter should use cached results from fib_rec
fib.fib_iter(k)
assert fib.fib_iter_called == 0
def test_fib_memoization_2():
fib = Fib()
f11 = fib.fib_iter(11)
f12 = fib.fib_iter(12)
assert fib.fib_iter_called == 2
f13 = fib.fib_rec(13)
# recursive calls should be cached
assert fib.fib_rec_called == 1
class Triad:
def __init__(self):
self.triad_called = 0
@sharedmethodcache("triad_cache")
def triad(self, a, b, c=0):
"""Computes the triad a*b+c."""
self.triad_called += 1
return a * b + c
def test_triad_memoization():
triad = Triad()
assert triad.triad.__doc__ == "Computes the triad a*b+c."
t = triad.triad(12, 4, 15)
assert triad.triad_called == 1
assert triad.triad_cache[(12, 4, 15)] == t
t = triad.triad(12, 4, c=15)
assert triad.triad_called == 2
assert triad.triad_cache[(12, 4, 'c', 15)] == t
t = triad.triad(12, 4, 15)
assert triad.triad_called == 2
t = triad.triad(12, 4, c=15)
assert triad.triad_called == 2
import sympy as sp import sympy as sp
import pystencils as ps
from pystencils import Assignment, AssignmentCollection from pystencils import Assignment, AssignmentCollection
from pystencils.simp import SimplificationStrategy, apply_on_all_subexpressions, \ from pystencils.simp import (
subexpression_substitution_in_existing_subexpressions SimplificationStrategy, apply_on_all_subexpressions,
subexpression_substitution_in_existing_subexpressions)
def test_simplification_strategy(): def test_simplification_strategy():
...@@ -27,7 +30,7 @@ def test_simplification_strategy(): ...@@ -27,7 +30,7 @@ def test_simplification_strategy():
result = strategy(ac) result = strategy(ac)
assert result.operation_count['adds'] == 7 assert result.operation_count['adds'] == 7
assert result.operation_count['muls'] == 5 assert result.operation_count['muls'] == 4
assert result.operation_count['divs'] == 0 assert result.operation_count['divs'] == 0
# Trigger display routines, such that they are at least executed # Trigger display routines, such that they are at least executed
...@@ -41,3 +44,45 @@ def test_simplification_strategy(): ...@@ -41,3 +44,45 @@ def test_simplification_strategy():
assert 'Adds' in report._repr_html_() assert 'Adds' in report._repr_html_()
assert 'factor' in str(strategy) assert 'factor' in str(strategy)
def test_split_inner_loop():
dst = ps.fields('dst(8): double[2D]')
s = sp.symbols('s_:8')
x = sp.symbols('x')
subexpressions = []
main = [
Assignment(dst[0, 0](0), s[0]),
Assignment(dst[0, 0](1), s[1]),
Assignment(dst[0, 0](2), s[2]),
Assignment(dst[0, 0](3), s[3]),
Assignment(dst[0, 0](4), s[4]),
Assignment(dst[0, 0](5), s[5]),
Assignment(dst[0, 0](6), s[6]),
Assignment(dst[0, 0](7), s[7]),
Assignment(x, sum(s))
]
ac = AssignmentCollection(main, subexpressions)
split_groups = [[dst[0, 0](0), dst[0, 0](1)],
[dst[0, 0](2), dst[0, 0](3)],
[dst[0, 0](4), dst[0, 0](5)],
[dst[0, 0](6), dst[0, 0](7), x]]
ac.simplification_hints['split_groups'] = split_groups
ast = ps.create_kernel(ac)
code = ps.get_code_str(ast)
ps.show_code(ast)
# we have four inner loops as indicated in split groups (4 elements) plus one outer loop
assert code.count('for') == 5
ast = ps.create_kernel(ac, target=ps.Target.GPU)
code = ps.get_code_str(ast)
# on GPUs is wouldn't be good to use loop splitting
assert code.count('for') == 0
ac = AssignmentCollection(main, subexpressions)
ast = ps.create_kernel(ac)
code = ps.get_code_str(ast)
# one inner loop and one outer loop
assert code.count('for') == 2
from sys import version_info as vs
import pytest
import pystencils.config
import sympy as sp
import pystencils as ps
from pystencils import Assignment, AssignmentCollection, fields
from pystencils.simp import subexpression_substitution_in_main_assignments
from pystencils.simp import add_subexpressions_for_divisions
from pystencils.simp import add_subexpressions_for_sums
from pystencils.simp import add_subexpressions_for_field_reads
from pystencils.simp.simplifications import add_subexpressions_for_constants
from pystencils.typing import BasicType, TypedSymbol
a, b, c, d, x, y, z = sp.symbols("a b c d x y z")
s0, s1, s2, s3 = sp.symbols("s_:4")
f = sp.symbols("f_:9")
def test_subexpression_substitution_in_main_assignments():
subexpressions = [
Assignment(s0, 2 * a + 2 * b),
Assignment(s1, 2 * a + 2 * b + 2 * c),
Assignment(s2, 2 * a + 2 * b + 2 * c + 2 * d),
Assignment(s3, 2 * a + 2 * b * c),
Assignment(x, s1 + s2 + s0 + s3)
]
main = [
Assignment(f[0], s1 + s2 + s0 + s3),
Assignment(f[1], s1 + s2 + s0 + s3),
Assignment(f[2], s1 + s2 + s0 + s3),
Assignment(f[3], s1 + s2 + s0 + s3),
Assignment(f[4], s1 + s2 + s0 + s3)
]
ac = AssignmentCollection(main, subexpressions)
ac = subexpression_substitution_in_main_assignments(ac)
for i in range(0, len(ac.main_assignments)):
assert ac.main_assignments[i].rhs == x
def test_add_subexpressions_for_divisions():
subexpressions = [
Assignment(s0, 2 / a + 2 / b),
Assignment(s1, 2 / a + 2 / b + 2 / c),
Assignment(s2, 2 / a + 2 / b + 2 / c + 2 / d),
Assignment(s3, 2 / a + 2 / b / c),
Assignment(x, s1 + s2 + s0 + s3)
]
main = [
Assignment(f[0], s1 + s2 + s0 + s3)
]
ac = AssignmentCollection(main, subexpressions)
divs_before_optimisation = ac.operation_count["divs"]
ac = add_subexpressions_for_divisions(ac)
divs_after_optimisation = ac.operation_count["divs"]
assert divs_before_optimisation - divs_after_optimisation == 8
rhs = []
for i in range(len(ac.subexpressions)):
rhs.append(ac.subexpressions[i].rhs)
assert 1/a in rhs
assert 1/b in rhs
assert 1/c in rhs
assert 1/d in rhs
def test_add_subexpressions_for_constants():
half = sp.Rational(1,2)
sqrt_2 = sp.sqrt(2)
main = [
Assignment(f[0], half * a + half * b + half * c),
Assignment(f[1], - half * a - half * b),
Assignment(f[2], a * sqrt_2 - b * sqrt_2),
Assignment(f[3], a**2 + b**2)
]
ac = AssignmentCollection(main)
ac = add_subexpressions_for_constants(ac)
assert len(ac.subexpressions) == 2
half_subexp = None
sqrt_subexp = None
for asm in ac.subexpressions:
if asm.rhs == half:
half_subexp = asm.lhs
elif asm.rhs == sqrt_2:
sqrt_subexp = asm.lhs
else:
pytest.fail(f"An unexpected subexpression was encountered: {asm}")
assert half_subexp is not None
assert sqrt_subexp is not None
for asm in ac.main_assignments[:3]:
assert isinstance(asm.rhs, sp.Mul)
assert any(arg == half_subexp for arg in ac.main_assignments[0].rhs.args)
assert any(arg == half_subexp for arg in ac.main_assignments[1].rhs.args)
assert any(arg == sqrt_subexp for arg in ac.main_assignments[2].rhs.args)
# Do not replace exponents!
assert ac.main_assignments[3].rhs == a**2 + b**2
def test_add_subexpressions_for_sums():
subexpressions = [
Assignment(s0, a + b + c + d),
Assignment(s1, 3 * a * sp.sqrt(x) + 4 * b + c),
Assignment(s2, 3 * a * sp.sqrt(x) + 4 * b + c),
Assignment(s3, 3 * a * sp.sqrt(x) + 4 * b + c)
]
main = [
Assignment(f[0], s1 + s2 + s0 + s3)
]
ac = AssignmentCollection(main, subexpressions)
ops_before_optimisation = ac.operation_count
ac = add_subexpressions_for_sums(ac)
ops_after_optimisation = ac.operation_count
assert ops_after_optimisation["adds"] == ops_before_optimisation["adds"]
assert ops_after_optimisation["muls"] < ops_before_optimisation["muls"]
assert ops_after_optimisation["sqrts"] < ops_before_optimisation["sqrts"]
rhs = []
for i in range(len(ac.subexpressions)):
rhs.append(ac.subexpressions[i].rhs)
assert a + b + c + d in rhs
assert 3 * a * sp.sqrt(x) in rhs
def test_add_subexpressions_for_field_reads():
s, v = fields("s(5), v(5): double[2D]")
subexpressions = []
main = [Assignment(s[0, 0](0), 3 * v[0, 0](0)),
Assignment(s[0, 0](1), 10 * v[0, 0](1))]
ac = AssignmentCollection(main, subexpressions)
assert len(ac.subexpressions) == 0
ac2 = add_subexpressions_for_field_reads(ac)
assert len(ac2.subexpressions) == 2
ac3 = add_subexpressions_for_field_reads(ac, data_type="float32")
assert len(ac3.subexpressions) == 2
assert isinstance(ac3.subexpressions[0].lhs, TypedSymbol)
assert ac3.subexpressions[0].lhs.dtype == BasicType("float32")
# added check for early out of add_subexpressions_for_field_reads is no fields appear on the rhs (See #92)
main = [Assignment(s[0, 0](0), 3.0),
Assignment(s[0, 0](1), 4.0)]
ac4 = AssignmentCollection(main, subexpressions)
assert len(ac4.subexpressions) == 0
ac5 = add_subexpressions_for_field_reads(ac4)
assert ac5 is not None
assert ac4 is ac5
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.skipif((vs.major, vs.minor, vs.micro) == (3, 8, 2), reason="does not work on python 3.8.2 for some reason")
def test_sympy_optimizations(target, dtype):
if target == ps.Target.GPU:
pytest.importorskip("cupy")
src, dst = ps.fields(f'src, dst: {dtype}[2d]')
assignments = ps.AssignmentCollection({
src[0, 0]: 1.0 * (sp.exp(dst[0, 0]) - 1)
})
config = pystencils.config.CreateKernelConfig(target=target, default_number_float=dtype)
ast = ps.create_kernel(assignments, config=config)
ps.show_code(ast)
code = ps.get_code_str(ast)
if dtype == 'float32':
assert 'expf(' in code
elif dtype == 'float64':
assert 'exp(' in code
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
@pytest.mark.parametrize('simplification', (True, False))
@pytest.mark.skipif((vs.major, vs.minor, vs.micro) == (3, 8, 2), reason="does not work on python 3.8.2 for some reason")
def test_evaluate_constant_terms(target, simplification):
if target == ps.Target.GPU:
pytest.importorskip("cupy")
src, dst = ps.fields('src, dst: float32[2d]')
# cos of a number will always be simplified
assignments = ps.AssignmentCollection({
src[0, 0]: -sp.cos(1) + dst[0, 0]
})
config = pystencils.config.CreateKernelConfig(target=target, default_assignment_simplifications=simplification)
ast = ps.create_kernel(assignments, config=config)
code = ps.get_code_str(ast)
assert 'cos(' not in code
import numpy as np import numpy as np
import pytest import pytest
from pystencils import Field, Assignment, fields, create_kernel
import pystencils
import sympy as sp import sympy as sp
from pystencils import Assignment, Field, create_kernel, fields
def test_size_check(): def test_size_check():
"""Kernel with two fixed-sized fields creating with same size but calling with wrong size""" """Kernel with two fixed-sized fields creating with same size but calling with wrong size"""
...@@ -22,7 +25,7 @@ def test_size_check(): ...@@ -22,7 +25,7 @@ def test_size_check():
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
func(src=src, dst=dst) func(src=src, dst=dst)
assert 'Wrong shape' in str(e) assert 'Wrong shape' in str(e.value)
def test_fixed_size_mismatch_check(): def test_fixed_size_mismatch_check():
...@@ -37,7 +40,7 @@ def test_fixed_size_mismatch_check(): ...@@ -37,7 +40,7 @@ def test_fixed_size_mismatch_check():
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
create_kernel([update_rule]) create_kernel([update_rule])
assert 'Differently sized field accesses' in str(e) assert 'Differently sized field accesses' in str(e.value)
def test_fixed_and_variable_field_check(): def test_fixed_and_variable_field_check():
...@@ -52,7 +55,7 @@ def test_fixed_and_variable_field_check(): ...@@ -52,7 +55,7 @@ def test_fixed_and_variable_field_check():
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
create_kernel(update_rule) create_kernel(update_rule)
assert 'Mixing fixed-shaped and variable-shape fields' in str(e) assert 'Mixing fixed-shaped and variable-shape fields' in str(e.value)
def test_two_variable_shaped_fields(): def test_two_variable_shaped_fields():
...@@ -69,7 +72,7 @@ def test_two_variable_shaped_fields(): ...@@ -69,7 +72,7 @@ def test_two_variable_shaped_fields():
with pytest.raises(TypeError) as e: with pytest.raises(TypeError) as e:
func(src=src, dst=dst) func(src=src, dst=dst)
assert 'must have same' in str(e) assert 'must have same' in str(e.value)
def test_ssa_checks(): def test_ssa_checks():
...@@ -80,18 +83,18 @@ def test_ssa_checks(): ...@@ -80,18 +83,18 @@ def test_ssa_checks():
create_kernel([Assignment(c, f[0, 1]), create_kernel([Assignment(c, f[0, 1]),
Assignment(c, f[1, 0]), Assignment(c, f[1, 0]),
Assignment(g[0, 0], c)]) Assignment(g[0, 0], c)])
assert 'Assignments not in SSA form' in str(e) assert 'Assignments not in SSA form' in str(e.value)
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
create_kernel([Assignment(c, a + 3), create_kernel([Assignment(c, a + 3),
Assignment(a, 42), Assignment(a, 42),
Assignment(g[0, 0], c)]) Assignment(g[0, 0], c)])
assert 'Symbol a is written, after it has been read' in str(e) assert 'Symbol a is written, after it has been read' in str(e.value)
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
create_kernel([Assignment(c, c + 1), create_kernel([Assignment(c, c + 1),
Assignment(g[0, 0], c)]) Assignment(g[0, 0], c)])
assert 'Symbol c is written, after it has been read' in str(e) assert 'Symbol c is written, after it has been read' in str(e.value)
def test_loop_independence_checks(): def test_loop_independence_checks():
...@@ -101,17 +104,24 @@ def test_loop_independence_checks(): ...@@ -101,17 +104,24 @@ def test_loop_independence_checks():
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
create_kernel([Assignment(g[0, 1], f[0, 1]), create_kernel([Assignment(g[0, 1], f[0, 1]),
Assignment(g[0, 0], f[1, 0])]) Assignment(g[0, 0], f[1, 0])])
assert 'Field g is written at two different locations' in str(e) assert 'Field g is written at two different locations' in str(e.value)
# This is allowed - because only one element of g is accessed # This is not allowed - because this is not SSA (it can be overwritten with allow_double_writes)
with pytest.raises(ValueError) as e:
create_kernel([Assignment(g[0, 2], f[0, 1]),
Assignment(g[0, 2], 2 * g[0, 2])])
# This is allowed - because allow_double_writes is True now
create_kernel([Assignment(g[0, 2], f[0, 1]), create_kernel([Assignment(g[0, 2], f[0, 1]),
Assignment(g[0, 2], 2 * g[0, 2])]) Assignment(g[0, 2], 2 * g[0, 2])],
config=pystencils.CreateKernelConfig(allow_double_writes=True))
create_kernel([Assignment(v[0, 2](1), f[0, 1]), with pytest.raises(ValueError) as e:
Assignment(v[0, 1](0), 4), create_kernel([Assignment(v[0, 2](1), f[0, 1]),
Assignment(v[0, 2](1), 2 * v[0, 2](1))]) Assignment(v[0, 1](0), 4),
Assignment(v[0, 2](1), 2 * v[0, 2](1))])
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
create_kernel([Assignment(g[0, 1], 3), create_kernel([Assignment(g[0, 1], 3),
Assignment(f[0, 1], 2 * g[0, 2])]) Assignment(f[0, 1], 2 * g[0, 2])])
assert 'Field g is read at (0, 2) and written at (0, 1)' in str(e) assert 'Field g is read at (0, 2) and written at (0, 1)' in str(e.value)
import numpy as np
import sympy as sp
import pytest
from pystencils import (
Assignment,
Field,
TypedSymbol,
create_kernel,
make_slice,
Target,
create_data_handling,
)
from pystencils.simp import sympy_cse_on_assignment_list
@pytest.mark.parametrize("target", [Target.CPU, Target.GPU])
def test_sliced_iteration(target):
if target == Target.GPU:
pytest.importorskip("cupy")
size = (4, 4)
dh = create_data_handling(size, default_target=target, default_ghost_layers=0)
src_field = dh.add_array("src", 1)
dst_field = dh.add_array("dst", 1)
dh.fill(src_field.name, 1.0, ghost_layers=True)
dh.fill(dst_field.name, 0.0, ghost_layers=True)
a, b = sp.symbols("a b")
update_rule = Assignment(
dst_field[0, 0],
(
a * src_field[0, 1]
+ a * src_field[0, -1]
+ b * src_field[1, 0]
+ b * src_field[-1, 0]
)
/ 4,
)
s = make_slice[1:3, 1]
kernel = create_kernel(
sympy_cse_on_assignment_list([update_rule]), iteration_slice=s, target=target
).compile()
if target == Target.GPU:
dh.all_to_gpu()
dh.run_kernel(kernel, a=1.0, b=1.0)
if target == Target.GPU:
dh.all_to_cpu()
expected_result = np.zeros(size)
expected_result[1:3, 1] = 1
np.testing.assert_almost_equal(dh.gather_array(dst_field.name), expected_result)
@pytest.mark.parametrize("target", [Target.CPU, Target.GPU])
def test_symbols_in_slice(target):
if target == Target.GPU:
pytest.xfail("Iteration slices including arbitrary symbols are currently broken on GPU")
size = (4, 4)
dh = create_data_handling(size, default_target=target, default_ghost_layers=0)
src_field = dh.add_array("src", 1)
dst_field = dh.add_array("dst", 1)
dh.fill(src_field.name, 1.0, ghost_layers=True)
dh.fill(dst_field.name, 0.0, ghost_layers=True)
a, b = sp.symbols("a b")
update_rule = Assignment(
dst_field[0, 0],
(
a * src_field[0, 1]
+ a * src_field[0, -1]
+ b * src_field[1, 0]
+ b * src_field[-1, 0]
)
/ 4,
)
x_end = TypedSymbol("x_end", "int")
s = make_slice[1:x_end, 1]
x_end_value = size[1] - 1
kernel = create_kernel(
sympy_cse_on_assignment_list([update_rule]), iteration_slice=s, target=target
).compile()
if target == Target.GPU:
dh.all_to_gpu()
dh.run_kernel(kernel, a=1.0, b=1.0, x_end=x_end_value)
if target == Target.GPU:
dh.all_to_cpu()
expected_result = np.zeros(size)
expected_result[1:x_end_value, 1] = 1
np.testing.assert_almost_equal(dh.gather_array(dst_field.name), expected_result)
import numpy as np
from numpy.testing import assert_array_equal
from pystencils import create_data_handling
from pystencils.slicing import SlicedGetter, make_slice, SlicedGetterDataHandling, shift_slice, slice_intersection
def test_sliced_getter():
def get_slice(slice_obj=None):
arr = np.ones((10, 10))
if slice_obj is None:
slice_obj = make_slice[:, :]
return arr[slice_obj]
sli = SlicedGetter(get_slice)
test = make_slice[2:-2, 2:-2]
assert sli[test].shape == (6, 6)
def test_sliced_getter_data_handling():
domain_shape = (10, 10)
dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=1)
dh.add_array("src", values_per_cell=1)
dh.fill("src", 1.0, ghost_layers=True)
dh.add_array("dst", values_per_cell=1)
dh.fill("dst", 0.0, ghost_layers=True)
sli = SlicedGetterDataHandling(dh, 'dst')
slice_obj = make_slice[2:-2, 2:-2]
assert np.sum(sli[slice_obj]) == 0
sli = SlicedGetterDataHandling(dh, 'src')
slice_obj = make_slice[2:-2, 2:-2]
assert np.sum(sli[slice_obj]) == 36
def test_shift_slice():
sh = shift_slice(make_slice[2:-2, 2:-2], [1, 2])
assert sh[0] == slice(3, -1, None)
assert sh[1] == slice(4, 0, None)
sh = shift_slice(make_slice[2:-2, 2:-2], 1)
assert sh[0] == slice(3, -1, None)
assert sh[1] == slice(3, -1, None)
sh = shift_slice([2, 4], 1)
assert sh[0] == 3
assert sh[1] == 5
sh = shift_slice([2, None], 1)
assert sh[0] == 3
assert sh[1] is None
sh = shift_slice([1.5, 1.5], 1)
assert sh[0] == 1.5
assert sh[1] == 1.5
def test_shifted_array_access():
arr = np.array(range(10))
sh = make_slice[2:5]
assert_array_equal(arr[sh], [2,3,4])
sh = shift_slice(sh, 3)
assert_array_equal(arr[sh], [5,6,7])
arr = np.array([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
])
sh = make_slice[0:2, 0:2]
assert_array_equal(arr[sh], [[1, 2], [4, 5]])
sh = shift_slice(sh, (1,1))
assert_array_equal(arr[sh], [[5, 6], [8, 9]])
def test_slice_intersection():
sl1 = make_slice[1:10, 1:10]
sl2 = make_slice[5:15, 5:15]
intersection = slice_intersection(sl1, sl2)
assert intersection[0] == slice(5, 10, None)
assert intersection[1] == slice(5, 10, None)
sl2 = make_slice[12:15, 12:15]
intersection = slice_intersection(sl1, sl2)
assert intersection is None
%% Cell type:code id: tags:
``` python
import pytest
pytest.importorskip('waLBerla')
```
%% Output
<module 'waLBerla' from '/Users/holzer/walberla/python/waLBerla/__init__.py'>
%% Cell type:code id: tags:
``` python
from pystencils.session import *
from time import perf_counter
from statistics import median
from functools import partial
```
%% Cell type:markdown id: tags:
## Benchmark for Python call overhead
%% Cell type:code id: tags:
``` python
inner_repeats = 100
outer_repeats = 5
sizes = [2**i for i in range(1, 8)]
sizes
```
%% Output
$\displaystyle \left[ 2, \ 4, \ 8, \ 16, \ 32, \ 64, \ 128\right]$
[2, 4, 8, 16, 32, 64, 128]
%% Cell type:code id: tags:
``` python
def benchmark_pure(domain_size, extract_first=False):
src = np.zeros(domain_size)
dst = np.zeros_like(src)
f_src, f_dst = ps.fields("src, dst", src=src, dst=dst)
kernel = ps.create_kernel(ps.Assignment(f_dst.center, f_src.center)).compile()
if extract_first:
kernel = kernel.kernel
start = perf_counter()
for i in range(inner_repeats):
kernel(src=src, dst=dst)
src, dst = dst, src
end = perf_counter()
else:
start = perf_counter()
for i in range(inner_repeats):
kernel(src=src, dst=dst)
src, dst = dst, src
end = perf_counter()
return (end - start) / inner_repeats
def benchmark_datahandling(domain_size, parallel=False):
dh = ps.create_data_handling(domain_size, parallel=parallel)
f_src = dh.add_array('src')
f_dst = dh.add_array('dst')
kernel = ps.create_kernel(ps.Assignment(f_dst.center, f_src.center)).compile()
start = perf_counter()
for i in range(inner_repeats):
dh.run_kernel(kernel)
dh.swap('src', 'dst')
end = perf_counter()
return (end - start) / inner_repeats
name_to_func = {
'pure_extract': partial(benchmark_pure, extract_first=True),
'pure_no_extract': partial(benchmark_pure, extract_first=False),
'dh_serial': partial(benchmark_datahandling, parallel=False),
'dh_parallel': partial(benchmark_datahandling, parallel=True),
}
```
%% Cell type:code id: tags:
``` python
result = {'block_size': [],
'name': [],
'time': []}
for bs in sizes:
print("Computing size ", bs)
for name, func in name_to_func.items():
for i in range(outer_repeats):
time = func((bs, bs))
result['block_size'].append(bs)
result['name'].append(name)
result['time'].append(time)
```
%% Output
Computing size 2
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/var/folders/07/0d7kq8fd0sx24cs53zz90_qc0000gp/T/ipykernel_12649/2009975470.py in <module>
7 for name, func in name_to_func.items():
8 for i in range(outer_repeats):
----> 9 time = func((bs, bs))
10 result['block_size'].append(bs)
11 result['name'].append(name)
/var/folders/07/0d7kq8fd0sx24cs53zz90_qc0000gp/T/ipykernel_12649/3509370390.py in benchmark_datahandling(domain_size, parallel)
20
21 def benchmark_datahandling(domain_size, parallel=False):
---> 22 dh = ps.create_data_handling(domain_size, parallel=parallel)
23 f_src = dh.add_array('src')
24 f_dst = dh.add_array('dst')
~/pystencils/pystencils/pystencils/datahandling/__init__.py in create_data_handling(domain_size, periodicity, default_layout, default_target, parallel, default_ghost_layers)
44 if parallel:
45 if wlb is None:
---> 46 raise ValueError("Cannot create parallel data handling because walberla module is not available")
47
48 if periodicity is False or periodicity is None:
ValueError: Cannot create parallel data handling because walberla module is not available
%% Cell type:code id: tags:
``` python
if 'is_test_run' not in globals():
import pandas as pd
import seaborn as sns
data = pd.DataFrame.from_dict(result)
plt.subplot(1,2,1)
sns.barplot(x='block_size', y='time', hue='name', data=data, alpha=0.6)
plt.yscale('log')
plt.subplot(1,2,2)
data = pd.DataFrame.from_dict(result)
sns.barplot(x='block_size', y='time', hue='name', data=data, alpha=0.6)
```
# -*- coding: utf-8 -*-
#
# Copyright © 2019 Stephan Seitz <stephan.seitz@fau.de>
#
# Distributed under terms of the GPLv3 license.
"""
"""
import pystencils
import pystencils.astnodes
import pystencils.config
def test_source_code_comment():
a, b = pystencils.fields('a,b: float[2D]')
assignments = pystencils.AssignmentCollection(
{a.center(): b[0, 2] + b[0, 0]}, {}
)
config = pystencils.config.CreateKernelConfig(target=pystencils.Target.CPU)
ast = pystencils.create_kernel(assignments, config=config)
ast.body.append(pystencils.astnodes.SourceCodeComment("Hallo"))
ast.body.append(pystencils.astnodes.EmptyLine())
ast.body.append(pystencils.astnodes.SourceCodeComment("World!"))
print(ast)
compiled = ast.compile()
assert compiled is not None
pystencils.show_code(ast)
import numpy as np
import sympy as sp
import pytest
import pystencils as ps
from pystencils import x_staggered_vector, TypedSymbol
from pystencils.enums import Target
class TestStaggeredDiffusion:
def _run(self, num_neighbors, target=ps.Target.CPU, openmp=False):
L = (40, 40)
D = 0.066
dt = 1
T = 100
dh = ps.create_data_handling(L, periodicity=True, default_target=target)
c = dh.add_array('c', values_per_cell=1)
j = dh.add_array('j', values_per_cell=num_neighbors, field_type=ps.FieldType.STAGGERED_FLUX)
x_staggered = - c[-1, 0] + c[0, 0]
y_staggered = - c[0, -1] + c[0, 0]
xy_staggered = - c[-1, -1] + c[0, 0]
xY_staggered = - c[-1, 1] + c[0, 0]
jj = j.staggered_access
divergence = -1 * D / (1 + sp.sqrt(2) if j.index_shape[0] == 4 else 1) * \
sum([jj(d) / sp.Matrix(ps.stencil.direction_string_to_offset(d)).norm() for d in j.staggered_stencil
+ [ps.stencil.inverse_direction_string(d) for d in j.staggered_stencil]])
update = [ps.Assignment(c.center, c.center + dt * divergence)]
flux = [ps.Assignment(j.staggered_access("W"), x_staggered),
ps.Assignment(j.staggered_access("S"), y_staggered)]
if j.index_shape[0] == 4:
flux += [ps.Assignment(j.staggered_access("SW"), xy_staggered),
ps.Assignment(j.staggered_access("NW"), xY_staggered)]
staggered_kernel = ps.create_staggered_kernel(flux, target=dh.default_target, cpu_openmp=openmp).compile()
div_kernel = ps.create_kernel(update, target=dh.default_target, cpu_openmp=openmp).compile()
def time_loop(steps):
sync = dh.synchronization_function([c.name])
dh.all_to_gpu()
for i in range(steps):
sync()
dh.run_kernel(staggered_kernel)
dh.run_kernel(div_kernel)
dh.all_to_cpu()
def init():
dh.fill(c.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(c.name, 0)
dh.fill(j.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.cpu_arrays[c.name][L[0] // 2:L[0] // 2 + 2, L[1] // 2:L[1] // 2 + 2] = 1.0
init()
time_loop(T)
reference = np.empty(L)
for x in range(L[0]):
for y in range(L[1]):
r = np.array([x, y]) - L[0] / 2 + 0.5
reference[x, y] = (4 * np.pi * D * T)**(-dh.dim / 2) * np.exp(-np.dot(r, r) / (4 * D * T)) * (2**dh.dim)
assert np.abs(dh.gather_array(c.name) - reference).max() < 5e-4
def test_diffusion_2(self):
self._run(2)
def test_diffusion_4(self):
self._run(4)
def test_diffusion_openmp(self):
self._run(4, openmp=True)
def test_staggered_subexpressions():
dh = ps.create_data_handling((10, 10), periodicity=True, default_target=Target.CPU)
j = dh.add_array('j', values_per_cell=2, field_type=ps.FieldType.STAGGERED)
c = sp.symbols("c")
assignments = [ps.Assignment(j.staggered_access("W"), c),
ps.Assignment(c, 1)]
ps.create_staggered_kernel(assignments, target=dh.default_target).compile()
def test_staggered_loop_cutting():
pytest.importorskip('islpy')
dh = ps.create_data_handling((4, 4), periodicity=True, default_target=Target.CPU)
j = dh.add_array('j', values_per_cell=4, field_type=ps.FieldType.STAGGERED)
assignments = [ps.Assignment(j.staggered_access("SW"), 1)]
ast = ps.create_staggered_kernel(assignments, target=dh.default_target)
assert not ast.atoms(ps.astnodes.Conditional)
def test_staggered_vector():
dim = 2
v = x_staggered_vector(dim)
ctr0 = TypedSymbol('ctr_0', 'int', nonnegative=True)
ctr1 = TypedSymbol('ctr_1', 'int', nonnegative=True)
expected_result = sp.Matrix(tuple((ctr0 + 0.5, ctr1 + 0.5)))
assert v == expected_result
\ No newline at end of file
%% Cell type:code id: tags:
``` python
import pystencils as ps
import sympy as sp
from pystencils.stencil import coefficient_list, plot_expression, plot
```
%% Cell type:code id: tags:
``` python
sten = ((0, 0, 0), (1, 0, 0), (0, 1, 0), (0, 0, 1),
(-1, 0, 0), (0, -1, 0), (0, 0, -1))
plot(sten)
```
%% Output
import pystencils as ps
import sympy as sp
from pystencils.stencil import coefficient_list, plot_expression
import pystencils.plot as plt
def test_coefficient_list():
f = ps.fields("f: double[1D]")
expr = 2 * f[1] + 3 * f[-1]
coff = coefficient_list(expr)
assert coff == [3, 0, 2]
figure = plt.figure()
plot_expression(expr, matrix_form=True, figure=figure)
f = ps.fields("f: double[3D]")
expr = 2 * f[1, 0, 0] + 3 * f[0, -1, 0]
coff = coefficient_list(expr)
assert coff == [[[0, 3, 0], [0, 0, 2], [0, 0, 0]]]
expr = 2 * f[1, 0, 0] + 3 * f[0, -1, 0] + 4 * f[0, 0, 1]
coff = coefficient_list(expr, matrix_form=True)
assert coff[0] == sp.zeros(3, 3)
# in 3D plot only works if there are entries on every of the three 2D planes. In the above examples z-1 was empty
expr = 2 * f[1, 0, 0] + 1 * f[0, -1, 0] + 1 * f[0, 0, 1] + f[0, 0, -1]
figure = plt.figure()
plot_expression(expr, figure=figure)
def test_plot_expression():
f = ps.fields("f: double[2D]")
figure = plt.figure()
plot_expression(2 * f[1, 0] + 3 * f[0, -1], matrix_form=True, figure=figure)
import numpy as np import numpy as np
from pystencils import Field, create_kernel, Assignment
from pystencils import Assignment, Field, create_kernel
def test_fixed_sized_field(): def test_fixed_sized_field():
......
from pystencils import fields, Assignment, AssignmentCollection
from pystencils.simp.subexpression_insertion import *
def test_subexpression_insertion():
f, g = fields('f(10), g(10) : [2D]')
xi = sp.symbols('xi_:10')
xi_set = set(xi)
subexpressions = [
Assignment(xi[0], -f(4)),
Assignment(xi[1], -(f(1) * f(2))),
Assignment(xi[2], 2.31 * f(5)),
Assignment(xi[3], 1.8 + f(5) + f(6)),
Assignment(xi[4], 5.7 + f(6)),
Assignment(xi[5], (f(4) + f(5))**2),
Assignment(xi[6], f(3)**2),
Assignment(xi[7], f(4)),
Assignment(xi[8], 13),
Assignment(xi[9], 0),
]
assignments = [Assignment(g(i), x) for i, x in enumerate(xi)]
ac = AssignmentCollection(assignments, subexpressions=subexpressions)
ac_ins = insert_symbol_times_minus_one(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[0]})
ac_ins = insert_constant_multiples(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[0], xi[2]})
ac_ins = insert_constant_additions(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[4]})
ac_ins = insert_squares(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[6]})
ac_ins = insert_aliases(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[7]})
ac_ins = insert_zeros(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[9]})
ac_ins = insert_constants(ac, skip={xi[9]})
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[8]})
# -*- coding: utf-8 -*-
#
# Copyright © 2019 Stephan Seitz <stephan.seitz@fau.de>
#
# Distributed under terms of the GPLv3 license.
"""
"""
import pytest
import numpy as np
import pystencils.config
import sympy as sp
import sympy.abc
import pystencils as ps
from pystencils.typing import create_type
@pytest.mark.parametrize('dtype', ["float64", "float32"])
def test_sum(dtype):
sum = sp.Sum(sp.abc.k, (sp.abc.k, 1, 100))
expanded_sum = sum.doit()
# print(sum)
# print(expanded_sum)
x = ps.fields(f'x: {dtype}[1d]')
assignments = ps.AssignmentCollection({x.center(): sum})
ast = ps.create_kernel(assignments)
code = ps.get_code_str(ast)
kernel = ast.compile()
# ps.show_code(ast)
if dtype == "float32":
assert "5050.0f;" in code
array = np.zeros((10,), dtype=dtype)
kernel(x=array)
assert np.allclose(array, int(expanded_sum) * np.ones_like(array))
@pytest.mark.parametrize('dtype', ["int32", "int64", "float64", "float32"])
def test_product(dtype):
k = ps.TypedSymbol('k', create_type(dtype))
sum = sympy.Product(k, (k, 1, 10))
expanded_sum = sum.doit()
# print(sum)
# print(expanded_sum)
x = ps.fields(f'x: {dtype}[1d]')
assignments = ps.AssignmentCollection({x.center(): sum})
config = pystencils.config.CreateKernelConfig()
ast = ps.create_kernel(assignments, config=config)
code = ps.get_code_str(ast)
kernel = ast.compile()
# print(code)
if dtype == "int64" or dtype == "int32":
assert '3628800;' in code
elif dtype == "float32":
assert '3628800.0f;' in code
else:
assert '3628800.0;' in code
array = np.zeros((10,), dtype=dtype)
kernel(x=array)
assert np.allclose(array, int(expanded_sum) * np.ones_like(array))
# TODO: See Issue !55
# def test_prod_var_limit():
#
# k = ps.TypedSymbol('k', create_type('int64'))
# limit = ps.TypedSymbol('limit', create_type('int64'))
#
# sum = sympy.Sum(k, (k, 1, limit))
# expanded_sum = sum.replace(limit, 100).doit()
#
# print(sum)
# print(expanded_sum)
#
# x = ps.fields('x: int64[1d]')
#
# assignments = ps.AssignmentCollection({x.center(): sum})
#
# ast = ps.create_kernel(assignments)
# ps.show_code(ast)
# kernel = ast.compile()
#
# array = np.zeros((10,), np.int64)
#
# kernel(x=array, limit=100)
#
# assert np.allclose(array, int(expanded_sum) * np.ones_like(array))
import sympy
import numpy as np
import sympy as sp
import pystencils
from pystencils.sympyextensions import replace_second_order_products
from pystencils.sympyextensions import remove_higher_order_terms
from pystencils.sympyextensions import complete_the_squares_in_exp
from pystencils.sympyextensions import extract_most_common_factor
from pystencils.sympyextensions import simplify_by_equality
from pystencils.sympyextensions import count_operations
from pystencils.sympyextensions import common_denominator
from pystencils.sympyextensions import get_symmetric_part
from pystencils.sympyextensions import scalar_product
from pystencils.sympyextensions import kronecker_delta
from pystencils import Assignment
from pystencils.functions import DivFunc
from pystencils.fast_approximation import (fast_division, fast_inv_sqrt, fast_sqrt,
insert_fast_divisions, insert_fast_sqrts)
def test_utility():
a = [1, 2]
b = (2, 3)
a_np = np.array(a)
b_np = np.array(b)
assert scalar_product(a, b) == np.dot(a_np, b_np)
a = sympy.Symbol("a")
b = sympy.Symbol("b")
assert kronecker_delta(a, a, a, b) == 0
assert kronecker_delta(a, a, a, a) == 1
assert kronecker_delta(3, 3, 3, 2) == 0
assert kronecker_delta(2, 2, 2, 2) == 1
assert kronecker_delta([10] * 100) == 1
assert kronecker_delta((0, 1), (0, 1)) == 1
def test_replace_second_order_products():
x, y = sympy.symbols('x y')
expr = 4 * x * y
expected_expr_positive = 2 * ((x + y) ** 2 - x ** 2 - y ** 2)
expected_expr_negative = 2 * (-(x - y) ** 2 + x ** 2 + y ** 2)
result = replace_second_order_products(expr, search_symbols=[x, y], positive=True)
assert result == expected_expr_positive
assert (result - expected_expr_positive).simplify() == 0
result = replace_second_order_products(expr, search_symbols=[x, y], positive=False)
assert result == expected_expr_negative
assert (result - expected_expr_negative).simplify() == 0
result = replace_second_order_products(expr, search_symbols=[x, y], positive=None)
assert result == expected_expr_positive
a = [Assignment(sympy.symbols('z'), x + y)]
replace_second_order_products(expr, search_symbols=[x, y], positive=True, replace_mixed=a)
assert len(a) == 2
assert replace_second_order_products(4 + y, search_symbols=[x, y]) == y + 4
def test_remove_higher_order_terms():
x, y = sympy.symbols('x y')
expr = sympy.Mul(x, y)
result = remove_higher_order_terms(expr, order=1, symbols=[x, y])
assert result == 0
result = remove_higher_order_terms(expr, order=2, symbols=[x, y])
assert result == expr
expr = sympy.Pow(x, 3)
result = remove_higher_order_terms(expr, order=2, symbols=[x, y])
assert result == 0
result = remove_higher_order_terms(expr, order=3, symbols=[x, y])
assert result == expr
def test_complete_the_squares_in_exp():
a, b, c, s, n = sympy.symbols('a b c s n')
expr = a * s ** 2 + b * s + c
result = complete_the_squares_in_exp(expr, symbols_to_complete=[s])
assert result == expr
expr = sympy.exp(a * s ** 2 + b * s + c)
expected_result = sympy.exp(a*s**2 + c - b**2 / (4*a))
result = complete_the_squares_in_exp(expr, symbols_to_complete=[s])
assert result == expected_result
def test_extract_most_common_factor():
x, y = sympy.symbols('x y')
expr = 1 / (x + y) + 3 / (x + y) + 3 / (x + y)
most_common_factor = extract_most_common_factor(expr)
assert most_common_factor[0] == 7
assert sympy.prod(most_common_factor) == expr
expr = 1 / x + 3 / (x + y) + 3 / y
most_common_factor = extract_most_common_factor(expr)
assert most_common_factor[0] == 3
assert sympy.prod(most_common_factor) == expr
expr = 1 / x
most_common_factor = extract_most_common_factor(expr)
assert most_common_factor[0] == 1
assert sympy.prod(most_common_factor) == expr
assert most_common_factor[1] == expr
def test_count_operations():
x, y, z = sympy.symbols('x y z')
expr = 1/x + y * sympy.sqrt(z)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
assert ops['muls'] == 1
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = 1 / sympy.sqrt(z)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 0
assert ops['muls'] == 0
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = sympy.Rel(1 / sympy.sqrt(z), 5)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 0
assert ops['muls'] == 0
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = sympy.sqrt(x + y)
expr = insert_fast_sqrts(expr).atoms(fast_sqrt)
ops = count_operations(*expr, only_type=None)
assert ops['fast_sqrts'] == 1
expr = sympy.sqrt(x / y)
expr = insert_fast_divisions(expr).atoms(fast_division)
ops = count_operations(*expr, only_type=None)
assert ops['fast_div'] == 1
expr = pystencils.Assignment(sympy.Symbol('tmp'), 3 / sympy.sqrt(x + y))
expr = insert_fast_sqrts(expr).atoms(fast_inv_sqrt)
ops = count_operations(*expr, only_type=None)
assert ops['fast_inv_sqrts'] == 1
expr = sympy.Piecewise((1.0, x > 0), (0.0, True)) + y * z
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
expr = sympy.Pow(1/x + y * sympy.sqrt(z), 100)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
assert ops['muls'] == 99
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = DivFunc(x, y)
ops = count_operations(expr, only_type=None)
assert ops['divs'] == 1
expr = DivFunc(x + z, y + z)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 2
assert ops['divs'] == 1
expr = sp.UnevaluatedExpr(sp.Mul(*[x]*100, evaluate=False))
ops = count_operations(expr, only_type=None)
assert ops['muls'] == 99
expr = DivFunc(1, sp.UnevaluatedExpr(sp.Mul(*[x]*100, evaluate=False)))
ops = count_operations(expr, only_type=None)
assert ops['divs'] == 1
assert ops['muls'] == 99
expr = DivFunc(y + z, sp.UnevaluatedExpr(sp.Mul(*[x]*100, evaluate=False)))
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
assert ops['divs'] == 1
assert ops['muls'] == 99
def test_common_denominator():
x = sympy.symbols('x')
expr = sympy.Rational(1, 2) + x * sympy.Rational(2, 3)
cm = common_denominator(expr)
assert cm == 6
def test_get_symmetric_part():
x, y, z = sympy.symbols('x y z')
expr = x / 9 - y ** 2 / 6 + z ** 2 / 3 + z / 3
expected_result = x / 9 - y ** 2 / 6 + z ** 2 / 3
sym_part = get_symmetric_part(expr, sympy.symbols(f'y z'))
assert sym_part == expected_result
def test_simplify_by_equality():
x, y, z = sp.symbols('x, y, z')
p, q = sp.symbols('p, q')
# Let x = y + z
expr = x * p - y * p + z * q
expr = simplify_by_equality(expr, x, y, z)
assert expr == z * p + z * q
expr = x * (p - 2 * q) + 2 * q * z
expr = simplify_by_equality(expr, x, y, z)
assert expr == x * p - 2 * q * y
expr = x * (y + z) - y * z
expr = simplify_by_equality(expr, x, y, z)
assert expr == x*y + z**2
# Let x = y + 2
expr = x * p - 2 * p
expr = simplify_by_equality(expr, x, y, 2)
assert expr == y * p
import time
import numpy as np
from pystencils import Assignment
from pystencils import create_kernel
from pystencils.datahandling import create_data_handling
from pystencils.timeloop import TimeLoop
def test_timeloop():
dh = create_data_handling(domain_size=(2, 2), periodicity=True)
pre = dh.add_array('pre_run_field', values_per_cell=1)
dh.fill("pre_run_field", 0.0, ghost_layers=True)
f = dh.add_array('field', values_per_cell=1)
dh.fill("field", 0.0, ghost_layers=True)
post = dh.add_array('post_run_field', values_per_cell=1)
dh.fill("post_run_field", 0.0, ghost_layers=True)
single_step = dh.add_array('single_step_field', values_per_cell=1)
dh.fill("single_step_field", 0.0, ghost_layers=True)
pre_assignments = Assignment(pre.center, pre.center + 1)
pre_kernel = create_kernel(pre_assignments).compile()
assignments = Assignment(f.center, f.center + 1)
kernel = create_kernel(assignments).compile()
post_assignments = Assignment(post.center, post.center + 1)
post_kernel = create_kernel(post_assignments).compile()
single_step_assignments = Assignment(single_step.center, single_step.center + 1)
single_step_kernel = create_kernel(single_step_assignments).compile()
fixed_steps = 2
timeloop = TimeLoop(steps=fixed_steps)
assert timeloop.fixed_steps == fixed_steps
def pre_run():
dh.run_kernel(pre_kernel)
def post_run():
dh.run_kernel(post_kernel)
def single_step_run():
dh.run_kernel(single_step_kernel)
timeloop.add_pre_run_function(pre_run)
timeloop.add_post_run_function(post_run)
timeloop.add_single_step_function(single_step_run)
timeloop.add_call(kernel, {'field': dh.cpu_arrays["field"]})
# the timeloop is initialised with 2 steps. This means a single time step consists of two steps.
# Therefore, we have 2 main iterations and one single step iteration in this configuration
timeloop.run(time_steps=5)
assert np.all(dh.cpu_arrays["pre_run_field"] == 1.0)
assert np.all(dh.cpu_arrays["field"] == 2.0)
assert np.all(dh.cpu_arrays["single_step_field"] == 1.0)
assert np.all(dh.cpu_arrays["post_run_field"] == 1.0)
seconds = 2
start = time.perf_counter()
timeloop.run_time_span(seconds=seconds)
end = time.perf_counter()
# This test case fails often due to time measurements. It is not a good idea to assert here
# np.testing.assert_almost_equal(seconds, end - start, decimal=2)
print("timeloop: ", seconds, " own meassurement: ", end - start)