Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 2249 additions and 16 deletions
import numpy as np import numpy as np
import pytest import pytest
from pystencils import Field, Assignment, fields, create_kernel
import pystencils
import sympy as sp import sympy as sp
from pystencils import Assignment, Field, create_kernel, fields
def test_size_check(): def test_size_check():
"""Kernel with two fixed-sized fields creating with same size but calling with wrong size""" """Kernel with two fixed-sized fields creating with same size but calling with wrong size"""
...@@ -22,7 +25,7 @@ def test_size_check(): ...@@ -22,7 +25,7 @@ def test_size_check():
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
func(src=src, dst=dst) func(src=src, dst=dst)
assert 'Wrong shape' in str(e) assert 'Wrong shape' in str(e.value)
def test_fixed_size_mismatch_check(): def test_fixed_size_mismatch_check():
...@@ -37,7 +40,7 @@ def test_fixed_size_mismatch_check(): ...@@ -37,7 +40,7 @@ def test_fixed_size_mismatch_check():
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
create_kernel([update_rule]) create_kernel([update_rule])
assert 'Differently sized field accesses' in str(e) assert 'Differently sized field accesses' in str(e.value)
def test_fixed_and_variable_field_check(): def test_fixed_and_variable_field_check():
...@@ -52,7 +55,7 @@ def test_fixed_and_variable_field_check(): ...@@ -52,7 +55,7 @@ def test_fixed_and_variable_field_check():
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
create_kernel(update_rule) create_kernel(update_rule)
assert 'Mixing fixed-shaped and variable-shape fields' in str(e) assert 'Mixing fixed-shaped and variable-shape fields' in str(e.value)
def test_two_variable_shaped_fields(): def test_two_variable_shaped_fields():
...@@ -69,7 +72,7 @@ def test_two_variable_shaped_fields(): ...@@ -69,7 +72,7 @@ def test_two_variable_shaped_fields():
with pytest.raises(TypeError) as e: with pytest.raises(TypeError) as e:
func(src=src, dst=dst) func(src=src, dst=dst)
assert 'must have same' in str(e) assert 'must have same' in str(e.value)
def test_ssa_checks(): def test_ssa_checks():
...@@ -80,18 +83,18 @@ def test_ssa_checks(): ...@@ -80,18 +83,18 @@ def test_ssa_checks():
create_kernel([Assignment(c, f[0, 1]), create_kernel([Assignment(c, f[0, 1]),
Assignment(c, f[1, 0]), Assignment(c, f[1, 0]),
Assignment(g[0, 0], c)]) Assignment(g[0, 0], c)])
assert 'Assignments not in SSA form' in str(e) assert 'Assignments not in SSA form' in str(e.value)
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
create_kernel([Assignment(c, a + 3), create_kernel([Assignment(c, a + 3),
Assignment(a, 42), Assignment(a, 42),
Assignment(g[0, 0], c)]) Assignment(g[0, 0], c)])
assert 'Symbol a is written, after it has been read' in str(e) assert 'Symbol a is written, after it has been read' in str(e.value)
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
create_kernel([Assignment(c, c + 1), create_kernel([Assignment(c, c + 1),
Assignment(g[0, 0], c)]) Assignment(g[0, 0], c)])
assert 'Symbol c is written, after it has been read' in str(e) assert 'Symbol c is written, after it has been read' in str(e.value)
def test_loop_independence_checks(): def test_loop_independence_checks():
...@@ -101,17 +104,24 @@ def test_loop_independence_checks(): ...@@ -101,17 +104,24 @@ def test_loop_independence_checks():
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
create_kernel([Assignment(g[0, 1], f[0, 1]), create_kernel([Assignment(g[0, 1], f[0, 1]),
Assignment(g[0, 0], f[1, 0])]) Assignment(g[0, 0], f[1, 0])])
assert 'Field g is written at two different locations' in str(e) assert 'Field g is written at two different locations' in str(e.value)
# This is allowed - because only one element of g is accessed # This is not allowed - because this is not SSA (it can be overwritten with allow_double_writes)
with pytest.raises(ValueError) as e:
create_kernel([Assignment(g[0, 2], f[0, 1]),
Assignment(g[0, 2], 2 * g[0, 2])])
# This is allowed - because allow_double_writes is True now
create_kernel([Assignment(g[0, 2], f[0, 1]), create_kernel([Assignment(g[0, 2], f[0, 1]),
Assignment(g[0, 2], 2 * g[0, 2])]) Assignment(g[0, 2], 2 * g[0, 2])],
config=pystencils.CreateKernelConfig(allow_double_writes=True))
create_kernel([Assignment(v[0, 2](1), f[0, 1]), with pytest.raises(ValueError) as e:
Assignment(v[0, 1](0), 4), create_kernel([Assignment(v[0, 2](1), f[0, 1]),
Assignment(v[0, 2](1), 2 * v[0, 2](1))]) Assignment(v[0, 1](0), 4),
Assignment(v[0, 2](1), 2 * v[0, 2](1))])
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
create_kernel([Assignment(g[0, 1], 3), create_kernel([Assignment(g[0, 1], 3),
Assignment(f[0, 1], 2 * g[0, 2])]) Assignment(f[0, 1], 2 * g[0, 2])])
assert 'Field g is read at (0, 2) and written at (0, 1)' in str(e) assert 'Field g is read at (0, 2) and written at (0, 1)' in str(e.value)
import numpy as np
import sympy as sp
import pytest
from pystencils import (
Assignment,
Field,
TypedSymbol,
create_kernel,
make_slice,
Target,
create_data_handling,
)
from pystencils.simp import sympy_cse_on_assignment_list
@pytest.mark.parametrize("target", [Target.CPU, Target.GPU])
def test_sliced_iteration(target):
if target == Target.GPU:
pytest.importorskip("cupy")
size = (4, 4)
dh = create_data_handling(size, default_target=target, default_ghost_layers=0)
src_field = dh.add_array("src", 1)
dst_field = dh.add_array("dst", 1)
dh.fill(src_field.name, 1.0, ghost_layers=True)
dh.fill(dst_field.name, 0.0, ghost_layers=True)
a, b = sp.symbols("a b")
update_rule = Assignment(
dst_field[0, 0],
(
a * src_field[0, 1]
+ a * src_field[0, -1]
+ b * src_field[1, 0]
+ b * src_field[-1, 0]
)
/ 4,
)
s = make_slice[1:3, 1]
kernel = create_kernel(
sympy_cse_on_assignment_list([update_rule]), iteration_slice=s, target=target
).compile()
if target == Target.GPU:
dh.all_to_gpu()
dh.run_kernel(kernel, a=1.0, b=1.0)
if target == Target.GPU:
dh.all_to_cpu()
expected_result = np.zeros(size)
expected_result[1:3, 1] = 1
np.testing.assert_almost_equal(dh.gather_array(dst_field.name), expected_result)
@pytest.mark.parametrize("target", [Target.CPU, Target.GPU])
def test_symbols_in_slice(target):
if target == Target.GPU:
pytest.xfail("Iteration slices including arbitrary symbols are currently broken on GPU")
size = (4, 4)
dh = create_data_handling(size, default_target=target, default_ghost_layers=0)
src_field = dh.add_array("src", 1)
dst_field = dh.add_array("dst", 1)
dh.fill(src_field.name, 1.0, ghost_layers=True)
dh.fill(dst_field.name, 0.0, ghost_layers=True)
a, b = sp.symbols("a b")
update_rule = Assignment(
dst_field[0, 0],
(
a * src_field[0, 1]
+ a * src_field[0, -1]
+ b * src_field[1, 0]
+ b * src_field[-1, 0]
)
/ 4,
)
x_end = TypedSymbol("x_end", "int")
s = make_slice[1:x_end, 1]
x_end_value = size[1] - 1
kernel = create_kernel(
sympy_cse_on_assignment_list([update_rule]), iteration_slice=s, target=target
).compile()
if target == Target.GPU:
dh.all_to_gpu()
dh.run_kernel(kernel, a=1.0, b=1.0, x_end=x_end_value)
if target == Target.GPU:
dh.all_to_cpu()
expected_result = np.zeros(size)
expected_result[1:x_end_value, 1] = 1
np.testing.assert_almost_equal(dh.gather_array(dst_field.name), expected_result)
import numpy as np
from numpy.testing import assert_array_equal
from pystencils import create_data_handling
from pystencils.slicing import SlicedGetter, make_slice, SlicedGetterDataHandling, shift_slice, slice_intersection
def test_sliced_getter():
def get_slice(slice_obj=None):
arr = np.ones((10, 10))
if slice_obj is None:
slice_obj = make_slice[:, :]
return arr[slice_obj]
sli = SlicedGetter(get_slice)
test = make_slice[2:-2, 2:-2]
assert sli[test].shape == (6, 6)
def test_sliced_getter_data_handling():
domain_shape = (10, 10)
dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=1)
dh.add_array("src", values_per_cell=1)
dh.fill("src", 1.0, ghost_layers=True)
dh.add_array("dst", values_per_cell=1)
dh.fill("dst", 0.0, ghost_layers=True)
sli = SlicedGetterDataHandling(dh, 'dst')
slice_obj = make_slice[2:-2, 2:-2]
assert np.sum(sli[slice_obj]) == 0
sli = SlicedGetterDataHandling(dh, 'src')
slice_obj = make_slice[2:-2, 2:-2]
assert np.sum(sli[slice_obj]) == 36
def test_shift_slice():
sh = shift_slice(make_slice[2:-2, 2:-2], [1, 2])
assert sh[0] == slice(3, -1, None)
assert sh[1] == slice(4, 0, None)
sh = shift_slice(make_slice[2:-2, 2:-2], 1)
assert sh[0] == slice(3, -1, None)
assert sh[1] == slice(3, -1, None)
sh = shift_slice([2, 4], 1)
assert sh[0] == 3
assert sh[1] == 5
sh = shift_slice([2, None], 1)
assert sh[0] == 3
assert sh[1] is None
sh = shift_slice([1.5, 1.5], 1)
assert sh[0] == 1.5
assert sh[1] == 1.5
def test_shifted_array_access():
arr = np.array(range(10))
sh = make_slice[2:5]
assert_array_equal(arr[sh], [2,3,4])
sh = shift_slice(sh, 3)
assert_array_equal(arr[sh], [5,6,7])
arr = np.array([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
])
sh = make_slice[0:2, 0:2]
assert_array_equal(arr[sh], [[1, 2], [4, 5]])
sh = shift_slice(sh, (1,1))
assert_array_equal(arr[sh], [[5, 6], [8, 9]])
def test_slice_intersection():
sl1 = make_slice[1:10, 1:10]
sl2 = make_slice[5:15, 5:15]
intersection = slice_intersection(sl1, sl2)
assert intersection[0] == slice(5, 10, None)
assert intersection[1] == slice(5, 10, None)
sl2 = make_slice[12:15, 12:15]
intersection = slice_intersection(sl1, sl2)
assert intersection is None
%% Cell type:code id: tags:
``` python
import pytest
pytest.importorskip('waLBerla')
```
%% Output
<module 'waLBerla' from '/Users/holzer/walberla/python/waLBerla/__init__.py'>
%% Cell type:code id: tags:
``` python
from pystencils.session import *
from time import perf_counter
from statistics import median
from functools import partial
```
%% Cell type:markdown id: tags:
## Benchmark for Python call overhead
%% Cell type:code id: tags:
``` python
inner_repeats = 100
outer_repeats = 5
sizes = [2**i for i in range(1, 8)]
sizes
```
%% Output
$\displaystyle \left[ 2, \ 4, \ 8, \ 16, \ 32, \ 64, \ 128\right]$
[2, 4, 8, 16, 32, 64, 128]
%% Cell type:code id: tags:
``` python
def benchmark_pure(domain_size, extract_first=False):
src = np.zeros(domain_size)
dst = np.zeros_like(src)
f_src, f_dst = ps.fields("src, dst", src=src, dst=dst)
kernel = ps.create_kernel(ps.Assignment(f_dst.center, f_src.center)).compile()
if extract_first:
kernel = kernel.kernel
start = perf_counter()
for i in range(inner_repeats):
kernel(src=src, dst=dst)
src, dst = dst, src
end = perf_counter()
else:
start = perf_counter()
for i in range(inner_repeats):
kernel(src=src, dst=dst)
src, dst = dst, src
end = perf_counter()
return (end - start) / inner_repeats
def benchmark_datahandling(domain_size, parallel=False):
dh = ps.create_data_handling(domain_size, parallel=parallel)
f_src = dh.add_array('src')
f_dst = dh.add_array('dst')
kernel = ps.create_kernel(ps.Assignment(f_dst.center, f_src.center)).compile()
start = perf_counter()
for i in range(inner_repeats):
dh.run_kernel(kernel)
dh.swap('src', 'dst')
end = perf_counter()
return (end - start) / inner_repeats
name_to_func = {
'pure_extract': partial(benchmark_pure, extract_first=True),
'pure_no_extract': partial(benchmark_pure, extract_first=False),
'dh_serial': partial(benchmark_datahandling, parallel=False),
'dh_parallel': partial(benchmark_datahandling, parallel=True),
}
```
%% Cell type:code id: tags:
``` python
result = {'block_size': [],
'name': [],
'time': []}
for bs in sizes:
print("Computing size ", bs)
for name, func in name_to_func.items():
for i in range(outer_repeats):
time = func((bs, bs))
result['block_size'].append(bs)
result['name'].append(name)
result['time'].append(time)
```
%% Output
Computing size 2
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/var/folders/07/0d7kq8fd0sx24cs53zz90_qc0000gp/T/ipykernel_12649/2009975470.py in <module>
7 for name, func in name_to_func.items():
8 for i in range(outer_repeats):
----> 9 time = func((bs, bs))
10 result['block_size'].append(bs)
11 result['name'].append(name)
/var/folders/07/0d7kq8fd0sx24cs53zz90_qc0000gp/T/ipykernel_12649/3509370390.py in benchmark_datahandling(domain_size, parallel)
20
21 def benchmark_datahandling(domain_size, parallel=False):
---> 22 dh = ps.create_data_handling(domain_size, parallel=parallel)
23 f_src = dh.add_array('src')
24 f_dst = dh.add_array('dst')
~/pystencils/pystencils/pystencils/datahandling/__init__.py in create_data_handling(domain_size, periodicity, default_layout, default_target, parallel, default_ghost_layers)
44 if parallel:
45 if wlb is None:
---> 46 raise ValueError("Cannot create parallel data handling because walberla module is not available")
47
48 if periodicity is False or periodicity is None:
ValueError: Cannot create parallel data handling because walberla module is not available
%% Cell type:code id: tags:
``` python
if 'is_test_run' not in globals():
import pandas as pd
import seaborn as sns
data = pd.DataFrame.from_dict(result)
plt.subplot(1,2,1)
sns.barplot(x='block_size', y='time', hue='name', data=data, alpha=0.6)
plt.yscale('log')
plt.subplot(1,2,2)
data = pd.DataFrame.from_dict(result)
sns.barplot(x='block_size', y='time', hue='name', data=data, alpha=0.6)
```
# -*- coding: utf-8 -*-
#
# Copyright © 2019 Stephan Seitz <stephan.seitz@fau.de>
#
# Distributed under terms of the GPLv3 license.
"""
"""
import pystencils
import pystencils.astnodes
import pystencils.config
def test_source_code_comment():
a, b = pystencils.fields('a,b: float[2D]')
assignments = pystencils.AssignmentCollection(
{a.center(): b[0, 2] + b[0, 0]}, {}
)
config = pystencils.config.CreateKernelConfig(target=pystencils.Target.CPU)
ast = pystencils.create_kernel(assignments, config=config)
ast.body.append(pystencils.astnodes.SourceCodeComment("Hallo"))
ast.body.append(pystencils.astnodes.EmptyLine())
ast.body.append(pystencils.astnodes.SourceCodeComment("World!"))
print(ast)
compiled = ast.compile()
assert compiled is not None
pystencils.show_code(ast)
import numpy as np
import sympy as sp
import pytest
import pystencils as ps
from pystencils import x_staggered_vector, TypedSymbol
from pystencils.enums import Target
class TestStaggeredDiffusion:
def _run(self, num_neighbors, target=ps.Target.CPU, openmp=False):
L = (40, 40)
D = 0.066
dt = 1
T = 100
dh = ps.create_data_handling(L, periodicity=True, default_target=target)
c = dh.add_array('c', values_per_cell=1)
j = dh.add_array('j', values_per_cell=num_neighbors, field_type=ps.FieldType.STAGGERED_FLUX)
x_staggered = - c[-1, 0] + c[0, 0]
y_staggered = - c[0, -1] + c[0, 0]
xy_staggered = - c[-1, -1] + c[0, 0]
xY_staggered = - c[-1, 1] + c[0, 0]
jj = j.staggered_access
divergence = -1 * D / (1 + sp.sqrt(2) if j.index_shape[0] == 4 else 1) * \
sum([jj(d) / sp.Matrix(ps.stencil.direction_string_to_offset(d)).norm() for d in j.staggered_stencil
+ [ps.stencil.inverse_direction_string(d) for d in j.staggered_stencil]])
update = [ps.Assignment(c.center, c.center + dt * divergence)]
flux = [ps.Assignment(j.staggered_access("W"), x_staggered),
ps.Assignment(j.staggered_access("S"), y_staggered)]
if j.index_shape[0] == 4:
flux += [ps.Assignment(j.staggered_access("SW"), xy_staggered),
ps.Assignment(j.staggered_access("NW"), xY_staggered)]
staggered_kernel = ps.create_staggered_kernel(flux, target=dh.default_target, cpu_openmp=openmp).compile()
div_kernel = ps.create_kernel(update, target=dh.default_target, cpu_openmp=openmp).compile()
def time_loop(steps):
sync = dh.synchronization_function([c.name])
dh.all_to_gpu()
for i in range(steps):
sync()
dh.run_kernel(staggered_kernel)
dh.run_kernel(div_kernel)
dh.all_to_cpu()
def init():
dh.fill(c.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(c.name, 0)
dh.fill(j.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.cpu_arrays[c.name][L[0] // 2:L[0] // 2 + 2, L[1] // 2:L[1] // 2 + 2] = 1.0
init()
time_loop(T)
reference = np.empty(L)
for x in range(L[0]):
for y in range(L[1]):
r = np.array([x, y]) - L[0] / 2 + 0.5
reference[x, y] = (4 * np.pi * D * T)**(-dh.dim / 2) * np.exp(-np.dot(r, r) / (4 * D * T)) * (2**dh.dim)
assert np.abs(dh.gather_array(c.name) - reference).max() < 5e-4
def test_diffusion_2(self):
self._run(2)
def test_diffusion_4(self):
self._run(4)
def test_diffusion_openmp(self):
self._run(4, openmp=True)
def test_staggered_subexpressions():
dh = ps.create_data_handling((10, 10), periodicity=True, default_target=Target.CPU)
j = dh.add_array('j', values_per_cell=2, field_type=ps.FieldType.STAGGERED)
c = sp.symbols("c")
assignments = [ps.Assignment(j.staggered_access("W"), c),
ps.Assignment(c, 1)]
ps.create_staggered_kernel(assignments, target=dh.default_target).compile()
def test_staggered_loop_cutting():
pytest.importorskip('islpy')
dh = ps.create_data_handling((4, 4), periodicity=True, default_target=Target.CPU)
j = dh.add_array('j', values_per_cell=4, field_type=ps.FieldType.STAGGERED)
assignments = [ps.Assignment(j.staggered_access("SW"), 1)]
ast = ps.create_staggered_kernel(assignments, target=dh.default_target)
assert not ast.atoms(ps.astnodes.Conditional)
def test_staggered_vector():
dim = 2
v = x_staggered_vector(dim)
ctr0 = TypedSymbol('ctr_0', 'int', nonnegative=True)
ctr1 = TypedSymbol('ctr_1', 'int', nonnegative=True)
expected_result = sp.Matrix(tuple((ctr0 + 0.5, ctr1 + 0.5)))
assert v == expected_result
\ No newline at end of file
%% Cell type:code id: tags:
``` python
import pystencils as ps
import sympy as sp
from pystencils.stencil import coefficient_list, plot_expression, plot
```
%% Cell type:code id: tags:
``` python
sten = ((0, 0, 0), (1, 0, 0), (0, 1, 0), (0, 0, 1),
(-1, 0, 0), (0, -1, 0), (0, 0, -1))
plot(sten)
```
%% Output
import pystencils as ps
import sympy as sp
from pystencils.stencil import coefficient_list, plot_expression
import pystencils.plot as plt
def test_coefficient_list():
f = ps.fields("f: double[1D]")
expr = 2 * f[1] + 3 * f[-1]
coff = coefficient_list(expr)
assert coff == [3, 0, 2]
figure = plt.figure()
plot_expression(expr, matrix_form=True, figure=figure)
f = ps.fields("f: double[3D]")
expr = 2 * f[1, 0, 0] + 3 * f[0, -1, 0]
coff = coefficient_list(expr)
assert coff == [[[0, 3, 0], [0, 0, 2], [0, 0, 0]]]
expr = 2 * f[1, 0, 0] + 3 * f[0, -1, 0] + 4 * f[0, 0, 1]
coff = coefficient_list(expr, matrix_form=True)
assert coff[0] == sp.zeros(3, 3)
# in 3D plot only works if there are entries on every of the three 2D planes. In the above examples z-1 was empty
expr = 2 * f[1, 0, 0] + 1 * f[0, -1, 0] + 1 * f[0, 0, 1] + f[0, 0, -1]
figure = plt.figure()
plot_expression(expr, figure=figure)
def test_plot_expression():
f = ps.fields("f: double[2D]")
figure = plt.figure()
plot_expression(2 * f[1, 0] + 3 * f[0, -1], matrix_form=True, figure=figure)
import numpy as np import numpy as np
from pystencils import Field, create_kernel, Assignment
from pystencils import Assignment, Field, create_kernel
def test_fixed_sized_field(): def test_fixed_sized_field():
......
from pystencils import fields, Assignment, AssignmentCollection
from pystencils.simp.subexpression_insertion import *
def test_subexpression_insertion():
f, g = fields('f(10), g(10) : [2D]')
xi = sp.symbols('xi_:10')
xi_set = set(xi)
subexpressions = [
Assignment(xi[0], -f(4)),
Assignment(xi[1], -(f(1) * f(2))),
Assignment(xi[2], 2.31 * f(5)),
Assignment(xi[3], 1.8 + f(5) + f(6)),
Assignment(xi[4], 5.7 + f(6)),
Assignment(xi[5], (f(4) + f(5))**2),
Assignment(xi[6], f(3)**2),
Assignment(xi[7], f(4)),
Assignment(xi[8], 13),
Assignment(xi[9], 0),
]
assignments = [Assignment(g(i), x) for i, x in enumerate(xi)]
ac = AssignmentCollection(assignments, subexpressions=subexpressions)
ac_ins = insert_symbol_times_minus_one(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[0]})
ac_ins = insert_constant_multiples(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[0], xi[2]})
ac_ins = insert_constant_additions(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[4]})
ac_ins = insert_squares(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[6]})
ac_ins = insert_aliases(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[7]})
ac_ins = insert_zeros(ac)
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[9]})
ac_ins = insert_constants(ac, skip={xi[9]})
assert (ac_ins.bound_symbols & xi_set) == (xi_set - {xi[8]})
# -*- coding: utf-8 -*-
#
# Copyright © 2019 Stephan Seitz <stephan.seitz@fau.de>
#
# Distributed under terms of the GPLv3 license.
"""
"""
import pytest
import numpy as np
import pystencils.config
import sympy as sp
import sympy.abc
import pystencils as ps
from pystencils.typing import create_type
@pytest.mark.parametrize('dtype', ["float64", "float32"])
def test_sum(dtype):
sum = sp.Sum(sp.abc.k, (sp.abc.k, 1, 100))
expanded_sum = sum.doit()
# print(sum)
# print(expanded_sum)
x = ps.fields(f'x: {dtype}[1d]')
assignments = ps.AssignmentCollection({x.center(): sum})
ast = ps.create_kernel(assignments)
code = ps.get_code_str(ast)
kernel = ast.compile()
# ps.show_code(ast)
if dtype == "float32":
assert "5050.0f;" in code
array = np.zeros((10,), dtype=dtype)
kernel(x=array)
assert np.allclose(array, int(expanded_sum) * np.ones_like(array))
@pytest.mark.parametrize('dtype', ["int32", "int64", "float64", "float32"])
def test_product(dtype):
k = ps.TypedSymbol('k', create_type(dtype))
sum = sympy.Product(k, (k, 1, 10))
expanded_sum = sum.doit()
# print(sum)
# print(expanded_sum)
x = ps.fields(f'x: {dtype}[1d]')
assignments = ps.AssignmentCollection({x.center(): sum})
config = pystencils.config.CreateKernelConfig()
ast = ps.create_kernel(assignments, config=config)
code = ps.get_code_str(ast)
kernel = ast.compile()
# print(code)
if dtype == "int64" or dtype == "int32":
assert '3628800;' in code
elif dtype == "float32":
assert '3628800.0f;' in code
else:
assert '3628800.0;' in code
array = np.zeros((10,), dtype=dtype)
kernel(x=array)
assert np.allclose(array, int(expanded_sum) * np.ones_like(array))
# TODO: See Issue !55
# def test_prod_var_limit():
#
# k = ps.TypedSymbol('k', create_type('int64'))
# limit = ps.TypedSymbol('limit', create_type('int64'))
#
# sum = sympy.Sum(k, (k, 1, limit))
# expanded_sum = sum.replace(limit, 100).doit()
#
# print(sum)
# print(expanded_sum)
#
# x = ps.fields('x: int64[1d]')
#
# assignments = ps.AssignmentCollection({x.center(): sum})
#
# ast = ps.create_kernel(assignments)
# ps.show_code(ast)
# kernel = ast.compile()
#
# array = np.zeros((10,), np.int64)
#
# kernel(x=array, limit=100)
#
# assert np.allclose(array, int(expanded_sum) * np.ones_like(array))
import sympy
import numpy as np
import sympy as sp
import pystencils
from pystencils.sympyextensions import replace_second_order_products
from pystencils.sympyextensions import remove_higher_order_terms
from pystencils.sympyextensions import complete_the_squares_in_exp
from pystencils.sympyextensions import extract_most_common_factor
from pystencils.sympyextensions import simplify_by_equality
from pystencils.sympyextensions import count_operations
from pystencils.sympyextensions import common_denominator
from pystencils.sympyextensions import get_symmetric_part
from pystencils.sympyextensions import scalar_product
from pystencils.sympyextensions import kronecker_delta
from pystencils import Assignment
from pystencils.functions import DivFunc
from pystencils.fast_approximation import (fast_division, fast_inv_sqrt, fast_sqrt,
insert_fast_divisions, insert_fast_sqrts)
def test_utility():
a = [1, 2]
b = (2, 3)
a_np = np.array(a)
b_np = np.array(b)
assert scalar_product(a, b) == np.dot(a_np, b_np)
a = sympy.Symbol("a")
b = sympy.Symbol("b")
assert kronecker_delta(a, a, a, b) == 0
assert kronecker_delta(a, a, a, a) == 1
assert kronecker_delta(3, 3, 3, 2) == 0
assert kronecker_delta(2, 2, 2, 2) == 1
assert kronecker_delta([10] * 100) == 1
assert kronecker_delta((0, 1), (0, 1)) == 1
def test_replace_second_order_products():
x, y = sympy.symbols('x y')
expr = 4 * x * y
expected_expr_positive = 2 * ((x + y) ** 2 - x ** 2 - y ** 2)
expected_expr_negative = 2 * (-(x - y) ** 2 + x ** 2 + y ** 2)
result = replace_second_order_products(expr, search_symbols=[x, y], positive=True)
assert result == expected_expr_positive
assert (result - expected_expr_positive).simplify() == 0
result = replace_second_order_products(expr, search_symbols=[x, y], positive=False)
assert result == expected_expr_negative
assert (result - expected_expr_negative).simplify() == 0
result = replace_second_order_products(expr, search_symbols=[x, y], positive=None)
assert result == expected_expr_positive
a = [Assignment(sympy.symbols('z'), x + y)]
replace_second_order_products(expr, search_symbols=[x, y], positive=True, replace_mixed=a)
assert len(a) == 2
assert replace_second_order_products(4 + y, search_symbols=[x, y]) == y + 4
def test_remove_higher_order_terms():
x, y = sympy.symbols('x y')
expr = sympy.Mul(x, y)
result = remove_higher_order_terms(expr, order=1, symbols=[x, y])
assert result == 0
result = remove_higher_order_terms(expr, order=2, symbols=[x, y])
assert result == expr
expr = sympy.Pow(x, 3)
result = remove_higher_order_terms(expr, order=2, symbols=[x, y])
assert result == 0
result = remove_higher_order_terms(expr, order=3, symbols=[x, y])
assert result == expr
def test_complete_the_squares_in_exp():
a, b, c, s, n = sympy.symbols('a b c s n')
expr = a * s ** 2 + b * s + c
result = complete_the_squares_in_exp(expr, symbols_to_complete=[s])
assert result == expr
expr = sympy.exp(a * s ** 2 + b * s + c)
expected_result = sympy.exp(a*s**2 + c - b**2 / (4*a))
result = complete_the_squares_in_exp(expr, symbols_to_complete=[s])
assert result == expected_result
def test_extract_most_common_factor():
x, y = sympy.symbols('x y')
expr = 1 / (x + y) + 3 / (x + y) + 3 / (x + y)
most_common_factor = extract_most_common_factor(expr)
assert most_common_factor[0] == 7
assert sympy.prod(most_common_factor) == expr
expr = 1 / x + 3 / (x + y) + 3 / y
most_common_factor = extract_most_common_factor(expr)
assert most_common_factor[0] == 3
assert sympy.prod(most_common_factor) == expr
expr = 1 / x
most_common_factor = extract_most_common_factor(expr)
assert most_common_factor[0] == 1
assert sympy.prod(most_common_factor) == expr
assert most_common_factor[1] == expr
def test_count_operations():
x, y, z = sympy.symbols('x y z')
expr = 1/x + y * sympy.sqrt(z)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
assert ops['muls'] == 1
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = 1 / sympy.sqrt(z)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 0
assert ops['muls'] == 0
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = sympy.Rel(1 / sympy.sqrt(z), 5)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 0
assert ops['muls'] == 0
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = sympy.sqrt(x + y)
expr = insert_fast_sqrts(expr).atoms(fast_sqrt)
ops = count_operations(*expr, only_type=None)
assert ops['fast_sqrts'] == 1
expr = sympy.sqrt(x / y)
expr = insert_fast_divisions(expr).atoms(fast_division)
ops = count_operations(*expr, only_type=None)
assert ops['fast_div'] == 1
expr = pystencils.Assignment(sympy.Symbol('tmp'), 3 / sympy.sqrt(x + y))
expr = insert_fast_sqrts(expr).atoms(fast_inv_sqrt)
ops = count_operations(*expr, only_type=None)
assert ops['fast_inv_sqrts'] == 1
expr = sympy.Piecewise((1.0, x > 0), (0.0, True)) + y * z
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
expr = sympy.Pow(1/x + y * sympy.sqrt(z), 100)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
assert ops['muls'] == 99
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = DivFunc(x, y)
ops = count_operations(expr, only_type=None)
assert ops['divs'] == 1
expr = DivFunc(x + z, y + z)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 2
assert ops['divs'] == 1
expr = sp.UnevaluatedExpr(sp.Mul(*[x]*100, evaluate=False))
ops = count_operations(expr, only_type=None)
assert ops['muls'] == 99
expr = DivFunc(1, sp.UnevaluatedExpr(sp.Mul(*[x]*100, evaluate=False)))
ops = count_operations(expr, only_type=None)
assert ops['divs'] == 1
assert ops['muls'] == 99
expr = DivFunc(y + z, sp.UnevaluatedExpr(sp.Mul(*[x]*100, evaluate=False)))
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
assert ops['divs'] == 1
assert ops['muls'] == 99
def test_common_denominator():
x = sympy.symbols('x')
expr = sympy.Rational(1, 2) + x * sympy.Rational(2, 3)
cm = common_denominator(expr)
assert cm == 6
def test_get_symmetric_part():
x, y, z = sympy.symbols('x y z')
expr = x / 9 - y ** 2 / 6 + z ** 2 / 3 + z / 3
expected_result = x / 9 - y ** 2 / 6 + z ** 2 / 3
sym_part = get_symmetric_part(expr, sympy.symbols(f'y z'))
assert sym_part == expected_result
def test_simplify_by_equality():
x, y, z = sp.symbols('x, y, z')
p, q = sp.symbols('p, q')
# Let x = y + z
expr = x * p - y * p + z * q
expr = simplify_by_equality(expr, x, y, z)
assert expr == z * p + z * q
expr = x * (p - 2 * q) + 2 * q * z
expr = simplify_by_equality(expr, x, y, z)
assert expr == x * p - 2 * q * y
expr = x * (y + z) - y * z
expr = simplify_by_equality(expr, x, y, z)
assert expr == x*y + z**2
# Let x = y + 2
expr = x * p - 2 * p
expr = simplify_by_equality(expr, x, y, 2)
assert expr == y * p
import time
import numpy as np
from pystencils import Assignment
from pystencils import create_kernel
from pystencils.datahandling import create_data_handling
from pystencils.timeloop import TimeLoop
def test_timeloop():
dh = create_data_handling(domain_size=(2, 2), periodicity=True)
pre = dh.add_array('pre_run_field', values_per_cell=1)
dh.fill("pre_run_field", 0.0, ghost_layers=True)
f = dh.add_array('field', values_per_cell=1)
dh.fill("field", 0.0, ghost_layers=True)
post = dh.add_array('post_run_field', values_per_cell=1)
dh.fill("post_run_field", 0.0, ghost_layers=True)
single_step = dh.add_array('single_step_field', values_per_cell=1)
dh.fill("single_step_field", 0.0, ghost_layers=True)
pre_assignments = Assignment(pre.center, pre.center + 1)
pre_kernel = create_kernel(pre_assignments).compile()
assignments = Assignment(f.center, f.center + 1)
kernel = create_kernel(assignments).compile()
post_assignments = Assignment(post.center, post.center + 1)
post_kernel = create_kernel(post_assignments).compile()
single_step_assignments = Assignment(single_step.center, single_step.center + 1)
single_step_kernel = create_kernel(single_step_assignments).compile()
fixed_steps = 2
timeloop = TimeLoop(steps=fixed_steps)
assert timeloop.fixed_steps == fixed_steps
def pre_run():
dh.run_kernel(pre_kernel)
def post_run():
dh.run_kernel(post_kernel)
def single_step_run():
dh.run_kernel(single_step_kernel)
timeloop.add_pre_run_function(pre_run)
timeloop.add_post_run_function(post_run)
timeloop.add_single_step_function(single_step_run)
timeloop.add_call(kernel, {'field': dh.cpu_arrays["field"]})
# the timeloop is initialised with 2 steps. This means a single time step consists of two steps.
# Therefore, we have 2 main iterations and one single step iteration in this configuration
timeloop.run(time_steps=5)
assert np.all(dh.cpu_arrays["pre_run_field"] == 1.0)
assert np.all(dh.cpu_arrays["field"] == 2.0)
assert np.all(dh.cpu_arrays["single_step_field"] == 1.0)
assert np.all(dh.cpu_arrays["post_run_field"] == 1.0)
seconds = 2
start = time.perf_counter()
timeloop.run_time_span(seconds=seconds)
end = time.perf_counter()
# This test case fails often due to time measurements. It is not a good idea to assert here
# np.testing.assert_almost_equal(seconds, end - start, decimal=2)
print("timeloop: ", seconds, " own meassurement: ", end - start)
import sympy as sp
import numpy as np
import pystencils as ps
from pystencils import fields, TypedSymbol
from pystencils.astnodes import LoopOverCoordinate, SympyAssignment
from pystencils.typing import create_type
from pystencils.transformations import (
filtered_tree_iteration, get_loop_hierarchy, get_loop_counter_symbol_hierarchy,
iterate_loops_by_depth, split_inner_loop, loop_blocking
)
from pystencils.cpu import add_pragmas
def test_loop_information():
f, g = ps.fields("f, g: double[2D]")
update_rule = ps.Assignment(g[0, 0], f[0, 0])
ast = ps.create_kernel(update_rule)
inner_loops = [loop for loop in filtered_tree_iteration(ast, LoopOverCoordinate, stop_type=SympyAssignment)
if loop.is_innermost_loop]
loop_order = []
for i in get_loop_hierarchy(inner_loops[0].args[0]):
loop_order.append(i)
assert loop_order == [0, 1]
loop_symbols = get_loop_counter_symbol_hierarchy(inner_loops[0].args[0])
assert loop_symbols == [TypedSymbol("ctr_1", create_type("int"), nonnegative=True),
TypedSymbol("ctr_0", create_type("int"), nonnegative=True)]
def test_iterate_loops_by_depth():
f, g = ps.fields("f, g: double[3D]", layout="fzyx")
x = ps.TypedSymbol('x', np.float64)
subs = [ps.Assignment(x, f[0, 0, 0])]
mains = [ps.Assignment(g[0, 0, 0], x)]
ac = ps.AssignmentCollection(mains, subexpressions=subs)
config = ps.CreateKernelConfig(cpu_blocking=(0, 16, 0))
ast = ps.create_kernel(ac, config=config)
split_inner_loop(ast, [[x], [g[0,0,0]]])
loops = list(iterate_loops_by_depth(ast, 0))
assert len(loops) == 1
assert loops[0].loop_counter_symbol.name == "_blockctr_1"
loops = list(iterate_loops_by_depth(ast, 1))
assert len(loops) == 1
assert loops[0].loop_counter_symbol.name == "ctr_2"
loops = list(iterate_loops_by_depth(ast, 2))
assert len(loops) == 1
assert loops[0].loop_counter_symbol.name == "ctr_1"
loops = list(iterate_loops_by_depth(ast, 3))
assert len(loops) == 2
assert loops[0].loop_counter_symbol.name == "ctr_0"
assert loops[1].loop_counter_symbol.name == "ctr_0"
innermost = list(iterate_loops_by_depth(ast, -1))
assert loops == innermost
def test_split_optimisation():
src, dst = fields(f"src(9), dst(9): [2D]", layout='fzyx')
stencil = ((0, 0), (0, 1), (0, -1), (-1, 0), (1, 0), (-1, 1), (1, 1), (-1, -1), (1, -1))
w = [sp.Rational(4, 9)]
w += [sp.Rational(1, 9)] * 4
w += [sp.Rational(1, 36)] * 4
cs = sp.Rational(1, 3)
subexpressions = []
main_assignements = []
rho = sp.symbols("rho")
velo = sp.symbols("u_:2")
density = 0
velocity_x = 0
velocity_y = 0
for d in stencil:
density += src[d]
velocity_x += d[0] * src[d]
velocity_y += d[1] * src[d]
subexpressions.append(ps.Assignment(rho, density))
subexpressions.append(ps.Assignment(velo[0], velocity_x))
subexpressions.append(ps.Assignment(velo[1], velocity_y))
for i, d in enumerate(stencil):
u_d = velo[0] * d[0] + velo[1] * d[1]
u_2 = velo[0] * velo[0] + velo[1] * velo[1]
expr = w[i] * rho * (1 + u_d / cs + u_d ** 2 / (2 * cs ** 2) - u_2 / (2 * cs))
main_assignements.append(ps.Assignment(dst.center_vector[i], expr))
ac = ps.AssignmentCollection(main_assignments=main_assignements, subexpressions=subexpressions)
simplification_hint = {'density': rho,
'velocity': (velo[0], velo[1]),
'split_groups': [[rho, velo[0], velo[1], dst.center_vector[0]],
[dst.center_vector[1], dst.center_vector[2]],
[dst.center_vector[3], dst.center_vector[4]],
[dst.center_vector[5], dst.center_vector[6]],
[dst.center_vector[7], dst.center_vector[8]]]}
ac.simplification_hints = simplification_hint
ast = ps.create_kernel(ac)
code = ps.get_code_str(ast)
# after the split optimisation the two for loops are split into 6
assert code.count("for") == 6
print(code)
def test_pragmas():
f, g = ps.fields("f, g: double[3D]", layout="fzyx")
x = ps.TypedSymbol('x', np.float64)
subs = [ps.Assignment(x, f[0, 0, 0])]
mains = [ps.Assignment(g[0, 0, 0], x)]
ac = ps.AssignmentCollection(mains, subexpressions=subs)
def prepend_omp_pragmas(ast):
add_pragmas(ast, ["#pragma omp for schedule(dynamic)"], nesting_depth=0)
add_pragmas(ast, ["#pragma omp simd simdlen(8)"], nesting_depth=-1)
ast_passes = [prepend_omp_pragmas]
config = ps.CreateKernelConfig(target=ps.Target.CPU, cpu_prepend_optimizations=ast_passes)
ast = ps.create_kernel(ac, config=config)
code = ps.get_code_str(ast)
assert code.find("#pragma omp for schedule(dynamic)") != -1
assert code.find("#pragma omp simd simdlen(8)") != -1
loops = [loop for loop in filtered_tree_iteration(ast, LoopOverCoordinate, stop_type=SympyAssignment)]
innermost = list(filter(lambda n: n.is_innermost_loop, loops))
assert innermost[0].prefix_lines == ["#pragma omp simd simdlen(8)"]
outermost = list(filter(lambda n: n.is_outermost_loop, loops))
assert outermost[0].prefix_lines == ["#pragma omp for schedule(dynamic)"]
from sympy.abc import a, b, c, d, e, f, g
import pystencils
from pystencils.typing import CastFunc, create_type
def test_type_interference():
x = pystencils.fields('x: float32[3d]')
assignments = pystencils.AssignmentCollection({
a: CastFunc(10, create_type('float64')),
b: CastFunc(10, create_type('uint16')),
e: 11,
c: b,
f: c + b,
d: c + b + x.center + e,
x.center: c + b + x.center,
g: a + b + d
})
ast = pystencils.create_kernel(assignments)
code = pystencils.get_code_str(ast)
# print(code)
assert 'const double a' in code
assert 'const uint16_t b' in code
assert 'const uint16_t f' in code
assert 'const int64_t e' in code
assert 'const float d = ((float)(b)) + ((float)(c)) + ((float)(e)) + ' \
'_data_x[_stride_x_0*ctr_0 + _stride_x_1*ctr_1 + _stride_x_2*ctr_2];' in code
assert '_data_x[_stride_x_0*ctr_0 + _stride_x_1*ctr_1 + _stride_x_2*ctr_2] = (' \
'(float)(b)) + ((float)(c)) + _data_x[_stride_x_0*ctr_0 + _stride_x_1*ctr_1 + _stride_x_2*ctr_2];' in code
assert 'const double g = a + ((double)(b)) + ((double)(d));' in code
import pytest
import pystencils.config
import sympy as sp
import numpy as np
import pystencils as ps
from pystencils.typing import TypedSymbol, get_type_of_expression, VectorType, collate_types, \
typed_symbols, CastFunc, PointerArithmeticFunc, PointerType, result_type, BasicType
def test_result_type():
i = np.dtype('int32')
l = np.dtype('int64')
ui = np.dtype('uint32')
ul = np.dtype('uint64')
f = np.dtype('float32')
d = np.dtype('float64')
b = np.dtype('bool')
assert result_type(i, l) == l
assert result_type(l, i) == l
assert result_type(ui, i) == i
assert result_type(ui, l) == l
assert result_type(ul, i) == i
assert result_type(ul, l) == l
assert result_type(d, f) == d
assert result_type(f, d) == d
assert result_type(i, f) == f
assert result_type(l, f) == f
assert result_type(ui, f) == f
assert result_type(ul, f) == f
assert result_type(i, d) == d
assert result_type(l, d) == d
assert result_type(ui, d) == d
assert result_type(ul, d) == d
assert result_type(b, i) == i
assert result_type(b, l) == l
assert result_type(b, ui) == ui
assert result_type(b, ul) == ul
assert result_type(b, f) == f
assert result_type(b, d) == d
@pytest.mark.parametrize('dtype', ('float64', 'float32', 'int64', 'int32', 'uint32', 'uint64'))
def test_simple_add(dtype):
constant = 1.0
if dtype[0] in 'ui':
constant = 1
f = ps.fields(f"f: {dtype}[1D]")
d = TypedSymbol("d", dtype)
test_arr = np.array([constant], dtype=dtype)
ur = ps.Assignment(f[0], f[0] + d)
ast = ps.create_kernel(ur)
code = ps.get_code_str(ast)
kernel = ast.compile()
kernel(f=test_arr, d=constant)
assert test_arr[0] == constant+constant
@pytest.mark.parametrize('dtype1', ('float64', 'float32', 'int64', 'int32', 'uint32', 'uint64'))
@pytest.mark.parametrize('dtype2', ('float64', 'float32', 'int64', 'int32', 'uint32', 'uint64'))
def test_mixed_add(dtype1, dtype2):
constant = 1
f = ps.fields(f"f: {dtype1}[1D]")
g = ps.fields(f"g: {dtype2}[1D]")
test_f = np.array([constant], dtype=dtype1)
test_g = np.array([constant], dtype=dtype2)
ur = ps.Assignment(f[0], f[0] + g[0])
# TODO Markus: check for the logging if colate_types(dtype1, dtype2) != dtype1
ast = ps.create_kernel(ur)
code = ps.get_code_str(ast)
kernel = ast.compile()
kernel(f=test_f, g=test_g)
assert test_f[0] == constant+constant
def test_collation():
double_type = BasicType('float64')
float_type = BasicType('float32')
double4_type = VectorType(double_type, 4)
float4_type = VectorType(float_type, 4)
assert collate_types([double_type, float_type]) == double_type
assert collate_types([double4_type, float_type]) == double4_type
assert collate_types([double4_type, float4_type]) == double4_type
def test_vector_type():
double_type = BasicType('float64')
float_type = BasicType('float32')
double4_type = VectorType(double_type, 4)
float4_type = VectorType(float_type, 4)
assert double4_type.item_size == 4
assert float4_type.item_size == 4
double4_type2 = VectorType(double_type, 4)
assert double4_type == double4_type2
assert double4_type != 4
assert double4_type != float4_type
def test_pointer_type():
double_type = BasicType('float64')
float_type = BasicType('float32')
double4_type = PointerType(double_type, restrict=True)
float4_type = PointerType(float_type, restrict=False)
assert double4_type.item_size == 1
assert float4_type.item_size == 1
assert not double4_type == 4
assert not double4_type.alias
assert float4_type.alias
def test_dtype_of_constants():
# Some come constants are neither of type Integer,Float,Rational and don't have args
# >>> isinstance(pi, Integer)
# False
# >>> isinstance(pi, Float)
# False
# >>> isinstance(pi, Rational)
# False
# >>> pi.args
# ()
get_type_of_expression(sp.pi)
def test_assumptions():
x = ps.fields('x: float32[3d]')
assert x.shape[0].is_nonnegative
assert (2 * x.shape[0]).is_nonnegative
assert (2 * x.shape[0]).is_integer
assert (TypedSymbol('a', BasicType('uint64'))).is_nonnegative
assert (TypedSymbol('a', BasicType('uint64'))).is_positive is None
assert (TypedSymbol('a', BasicType('uint64')) + 1).is_positive
assert (x.shape[0] + 1).is_real
@pytest.mark.parametrize('dtype', ('float64', 'float32'))
def test_sqrt_of_integer(dtype):
"""Regression test for bug where sqrt(3) was classified as integer"""
f = ps.fields(f'f: {dtype}[1D]')
tmp = sp.symbols('tmp')
assignments = [ps.Assignment(tmp, sp.sqrt(3)),
ps.Assignment(f[0], tmp)]
arr = np.array([1], dtype=dtype)
config = pystencils.config.CreateKernelConfig(data_type=dtype, default_number_float=dtype)
ast = ps.create_kernel(assignments, config=config)
kernel = ast.compile()
kernel(f=arr)
assert 1.7 < arr[0] < 1.8
code = ps.get_code_str(ast)
constant = '1.7320508075688772f'
if dtype == 'float32':
assert constant in code
else:
assert constant not in code
@pytest.mark.parametrize('dtype', ('float64', 'float32'))
def test_integer_comparision(dtype):
f = ps.fields(f"f: {dtype}[2D]")
d = TypedSymbol("dir", "int64")
ur = ps.Assignment(f[0, 0], sp.Piecewise((0, sp.Equality(d, 1)), (f[0, 0], True)))
ast = ps.create_kernel(ur)
code = ps.get_code_str(ast)
# There should be an explicit cast for the integer zero to the type of the field on the rhs
if dtype == 'float64':
t = "_data_f[_stride_f_0*ctr_0 + _stride_f_1*ctr_1] = " \
"((((dir) == (1))) ? (0.0): (_data_f[_stride_f_0*ctr_0 + _stride_f_1*ctr_1]));"
else:
t = "_data_f[_stride_f_0*ctr_0 + _stride_f_1*ctr_1] = " \
"((((dir) == (1))) ? (0.0f): (_data_f[_stride_f_0*ctr_0 + _stride_f_1*ctr_1]));"
assert t in code
def test_typed_symbols_dtype():
assert typed_symbols(("s", "f"), np.uint) == typed_symbols("s, f", np.uint)
t_symbols = typed_symbols(("s", "f"), np.uint)
s = t_symbols[0]
assert t_symbols[0] == TypedSymbol("s", np.uint)
assert s.dtype.is_uint()
assert typed_symbols("s", np.float64).dtype.c_name == 'double'
assert typed_symbols("s", np.float32).dtype.c_name == 'float'
assert TypedSymbol("s", np.uint).canonical == TypedSymbol("s", np.uint)
assert TypedSymbol("s", np.uint).reversed == TypedSymbol("s", np.uint)
def test_cast_func():
assert CastFunc(TypedSymbol("s", np.uint), np.int64).canonical == TypedSymbol("s", np.uint).canonical
a = CastFunc(5, np.uint)
assert a.is_negative is False
assert a.is_nonnegative
def test_pointer_arithmetic_func():
assert PointerArithmeticFunc(TypedSymbol("s", np.uint), 1).canonical == TypedSymbol("s", np.uint).canonical
def test_division():
f = ps.fields('f(10): float32[2D]')
m, tau = sp.symbols("m, tau")
up = [ps.Assignment(tau, 1 / (0.5 + (3.0 * m))),
ps.Assignment(f.center, tau)]
config = pystencils.config.CreateKernelConfig(data_type='float32', default_number_float='float32')
ast = ps.create_kernel(up, config=config)
code = ps.get_code_str(ast)
assert "((1.0f) / (m*3.0f + 0.5f))" in code
def test_pow():
f = ps.fields('f(10): float32[2D]')
m, tau = sp.symbols("m, tau")
up = [ps.Assignment(tau, m ** 1.5),
ps.Assignment(f.center, tau)]
config = pystencils.config.CreateKernelConfig(data_type="float32", default_number_float='float32')
ast = ps.create_kernel(up, config=config)
code = ps.get_code_str(ast)
assert "1.5f" in code
import pytest import pytest
import sympy as sp import sympy as sp
from pystencils.utils import LinearEquationSystem from pystencils.utils import LinearEquationSystem
from pystencils.utils import DotDict
def test_linear_equation_system(): def test_linear_equation_system():
...@@ -30,4 +31,51 @@ def test_linear_equation_system(): ...@@ -30,4 +31,51 @@ def test_linear_equation_system():
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
m.add_equation(x**2 - 1) m.add_equation(x**2 - 1)
assert 'Not a linear equation' in str(e) assert 'Not a linear equation' in str(e.value)
x, y, z = sp.symbols("x, y, z")
les = LinearEquationSystem([x, y, z])
les.add_equation(1 * x + 2 * y - 1 * z + 4)
les.add_equation(2 * x + 1 * y + 1 * z - 2)
les.add_equation(1 * x + 2 * y + 1 * z + 2)
# usually reduce is not necessary since almost every function of LinearEquationSystem calls reduce beforehand
les.reduce()
expected_matrix = sp.Matrix([[1, 0, 0, sp.Rational(5, 3)],
[0, 1, 0, sp.Rational(-7, 3)],
[0, 0, 1, sp.Integer(1)]])
assert les.matrix == expected_matrix
assert les.rank == 3
sol = les.solution()
assert sol[x] == sp.Rational(5, 3)
assert sol[y] == sp.Rational(-7, 3)
assert sol[z] == sp.Integer(1)
les = LinearEquationSystem([x, y])
assert les.solution_structure() == 'multiple'
les.add_equation(x + 1)
assert les.solution_structure() == 'multiple'
les.add_equation(y + 2)
assert les.solution_structure() == 'single'
les.add_equation(x + y + 5)
assert les.solution_structure() == 'none'
def test_dot_dict():
d = {'a': {'c': 7}, 'b': 6}
t = DotDict(d)
assert t.a.c == 7
assert t.b == 6
assert len(t) == 2
delattr(t, 'b')
assert len(t) == 1
t.b = 6
assert len(t) == 2
assert t.b == 6
import numpy as np
import pytest
import pystencils.config
import sympy as sp
import pystencils as ps
import pystencils.astnodes as ast
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets, get_vector_instruction_set
from pystencils.cpu.vectorization import vectorize
from pystencils.enums import Target
from pystencils.transformations import replace_inner_stride_with_one
supported_instruction_sets = get_supported_instruction_sets()
if supported_instruction_sets:
instruction_set = supported_instruction_sets[-1]
else:
instruction_set = None
# TODO: Skip tests if no instruction set is available and check all codes if they are really vectorised !
def test_vector_type_propagation1(instruction_set=instruction_set):
a, b, c, d, e = sp.symbols("a b c d e")
arr = np.ones((2 ** 2 + 2, 2 ** 3 + 2))
arr *= 10.0
f, g = ps.fields(f=arr, g=arr)
update_rule = [ps.Assignment(a, f[1, 0]),
ps.Assignment(b, a),
ps.Assignment(g[0, 0], b + 3 + f[0, 1])]
ast = ps.create_kernel(update_rule)
vectorize(ast, instruction_set=instruction_set)
# ps.show_code(ast)
func = ast.compile()
dst = np.zeros_like(arr)
func(g=dst, f=arr)
np.testing.assert_equal(dst[1:-1, 1:-1], 2 * 10.0 + 3)
def test_vector_type_propagation2(instruction_set=instruction_set):
data_type = 'float32'
assume_aligned = True
assume_inner_stride_one = True
assume_sufficient_line_padding = True
domain_size = (24, 24)
dh = ps.create_data_handling(domain_size, default_target=Target.CPU)
# fields
g = dh.add_array("g", values_per_cell=1, dtype=data_type, alignment=True, ghost_layers=0)
f = dh.add_array("f", values_per_cell=1, dtype=data_type, alignment=True, ghost_layers=0)
dh.fill("g", 1.0, ghost_layers=True)
dh.fill("f", 0.5, ghost_layers=True)
collision = [
ps.Assignment(sp.Symbol("b"), sp.sqrt(sp.Symbol("a") * (1 - (1 - f.center)**2))),
ps.Assignment(g.center, sp.Symbol("b"))
]
config = ps.CreateKernelConfig(
target=ps.Target.CPU,
data_type=data_type,
default_number_float=data_type,
cpu_vectorize_info={
'assume_aligned': assume_aligned,
'assume_inner_stride_one': assume_inner_stride_one,
'assume_sufficient_line_padding': assume_sufficient_line_padding,
},
ghost_layers=0
)
ast = ps.create_kernel(collision, config=config)
print(ast)
code = ps.get_code_str(ast)
print(code)
kernel = ast.compile()
dh.run_kernel(kernel, a=4)
g_arr = dh.cpu_arrays['g']
np.testing.assert_allclose(g_arr, np.full_like(g_arr, np.sqrt(3)))
def test_vectorize_moved_constants1(instruction_set=instruction_set):
opt = {'instruction_set': instruction_set, 'assume_inner_stride_one': True}
f = ps.fields("f: [1D]")
x = ast.TypedSymbol("x", np.float64)
kernel_func = ps.create_kernel(
[ast.SympyAssignment(x, 2.0), ast.SympyAssignment(f[0], x)],
cpu_prepend_optimizations=[ps.transformations.move_constants_before_loop], # explicitly move constants
cpu_vectorize_info=opt,
)
ps.show_code(kernel_func) # fails if `x` on rhs was not correctly vectorized
kernel = kernel_func.compile()
f_arr = np.zeros(9)
kernel(f=f_arr)
assert(np.all(f_arr == 2))
def test_vectorize_moved_constants2(instruction_set=instruction_set):
opt = {'instruction_set': instruction_set, 'assume_inner_stride_one': True}
f = ps.fields("f: [1D]")
x = ast.TypedSymbol("x", np.float64)
y = ast.TypedSymbol("y", np.float64)
kernel_func = ps.create_kernel(
[ast.SympyAssignment(x, 2.0), ast.SympyAssignment(y, 3.0), ast.SympyAssignment(f[0], x + y)],
cpu_prepend_optimizations=[ps.transformations.move_constants_before_loop], # explicitly move constants
cpu_vectorize_info=opt,
)
ps.show_code(kernel_func) # fails if `x` on rhs was not correctly vectorized
kernel = kernel_func.compile()
f_arr = np.zeros(9)
kernel(f=f_arr)
assert(np.all(f_arr == 5))
@pytest.mark.parametrize('openmp', [True, False])
def test_aligned_and_nt_stores(openmp, instruction_set=instruction_set):
domain_size = (24, 24)
# create a datahandling object
dh = ps.create_data_handling(domain_size, periodicity=(True, True), parallel=False, default_target=Target.CPU)
# fields
alignment = 'cacheline' if openmp else True
g = dh.add_array("g", values_per_cell=1, alignment=alignment)
dh.fill("g", 1.0, ghost_layers=True)
f = dh.add_array("f", values_per_cell=1, alignment=alignment)
dh.fill("f", 0.0, ghost_layers=True)
opt = {'instruction_set': instruction_set, 'assume_aligned': True, 'nontemporal': True,
'assume_inner_stride_one': True}
update_rule = [ps.Assignment(f.center(), 0.25 * (g[-1, 0] + g[1, 0] + g[0, -1] + g[0, 1]))]
# Without the base pointer spec, the inner store is not aligned
config = pystencils.config.CreateKernelConfig(target=dh.default_target, cpu_vectorize_info=opt, cpu_openmp=openmp)
ast = ps.create_kernel(update_rule, config=config)
if instruction_set in ['sse'] or instruction_set.startswith('avx') or instruction_set.startswith('sve'):
assert 'stream' in ast.instruction_set
assert 'streamFence' in ast.instruction_set
if instruction_set in ['neon', 'vsx', 'rvv']:
assert 'cachelineZero' in ast.instruction_set
if instruction_set in ['vsx']:
assert 'storeAAndFlushCacheline' in ast.instruction_set
for instruction in ['stream', 'streamFence', 'cachelineZero', 'storeAAndFlushCacheline', 'flushCacheline']:
if instruction in ast.instruction_set:
assert ast.instruction_set[instruction].split('{')[0] in ps.get_code_str(ast)
kernel = ast.compile()
# ps.show_code(ast)
dh.run_kernel(kernel)
np.testing.assert_equal(np.sum(dh.cpu_arrays['f']), np.prod(domain_size))
def test_nt_stores_symbolic_size(instruction_set=instruction_set):
f, g = ps.fields('f, g: [2D]', layout='fzyx')
update_rule = [ps.Assignment(f.center(), 0.0), ps.Assignment(g.center(), 0.0)]
opt = {'instruction_set': instruction_set, 'assume_aligned': True, 'nontemporal': True,
'assume_inner_stride_one': True}
config = pystencils.config.CreateKernelConfig(target=Target.CPU, cpu_vectorize_info=opt)
ast = ps.create_kernel(update_rule, config=config)
# ps.show_code(ast)
ast.compile()
def test_inplace_update(instruction_set=instruction_set):
shape = (9, 9, 3)
arr = np.ones(shape, order='f')
@ps.kernel
def update_rule(s):
f = ps.fields("f(3) : [2D]", f=arr)
s.tmp0 @= f(0)
s.tmp1 @= f(1)
s.tmp2 @= f(2)
f0, f1, f2 = f(0), f(1), f(2)
f0 @= 2 * s.tmp0
f1 @= 2 * s.tmp0
f2 @= 2 * s.tmp0
config = pystencils.config.CreateKernelConfig(cpu_vectorize_info={'instruction_set': instruction_set})
ast = ps.create_kernel(update_rule, config=config)
kernel = ast.compile()
kernel(f=arr)
np.testing.assert_equal(arr, 2)
def test_vectorization_fixed_size(instruction_set=instruction_set):
instructions = get_vector_instruction_set(instruction_set=instruction_set)
configurations = []
# Fixed size - multiple of four
arr = np.ones((20 + 2, 24 + 2)) * 5.0
f, g = ps.fields(f=arr, g=arr)
configurations.append((arr, f, g))
# Fixed size - no multiple of four
arr = np.ones((21 + 2, 25 + 2)) * 5.0
f, g = ps.fields(f=arr, g=arr)
configurations.append((arr, f, g))
# Fixed size - different remainder
arr = np.ones((23 + 2, 17 + 2)) * 5.0
f, g = ps.fields(f=arr, g=arr)
configurations.append((arr, f, g))
for arr, f, g in configurations:
update_rule = [ps.Assignment(g[0, 0], f[0, 0] + f[-1, 0] + f[1, 0] + f[0, 1] + f[0, -1] + 42.0)]
ast = ps.create_kernel(update_rule)
vectorize(ast, instruction_set=instruction_set)
code = ps.get_code_str(ast)
add_instruction = instructions["+"][:instructions["+"].find("(")]
assert add_instruction in code
# print(code)
func = ast.compile()
dst = np.zeros_like(arr)
func(g=dst, f=arr)
np.testing.assert_equal(dst[1:-1, 1:-1], 5 * 5.0 + 42.0)
def test_vectorization_variable_size(instruction_set=instruction_set):
f, g = ps.fields("f, g : double[2D]")
update_rule = [ps.Assignment(g[0, 0], f[0, 0] + f[-1, 0] + f[1, 0] + f[0, 1] + f[0, -1] + 42.0)]
ast = ps.create_kernel(update_rule)
replace_inner_stride_with_one(ast)
vectorize(ast, instruction_set=instruction_set)
func = ast.compile()
arr = np.ones((23 + 2, 17 + 2)) * 5.0
dst = np.zeros_like(arr)
func(g=dst, f=arr)
np.testing.assert_equal(dst[1:-1, 1:-1], 5 * 5.0 + 42.0)
def test_piecewise1(instruction_set=instruction_set):
a, b, c, d, e = sp.symbols("a b c d e")
arr = np.ones((2 ** 3 + 2, 2 ** 4 + 2)) * 5.0
f, g = ps.fields(f=arr, g=arr)
update_rule = [ps.Assignment(a, f[1, 0]),
ps.Assignment(b, a),
ps.Assignment(c, f[0, 0] > 0.0),
ps.Assignment(g[0, 0], sp.Piecewise((b + 3 + f[0, 1], c), (0.0, True)))]
ast = ps.create_kernel(update_rule)
vectorize(ast, instruction_set=instruction_set)
func = ast.compile()
dst = np.zeros_like(arr)
func(g=dst, f=arr)
np.testing.assert_equal(dst[1:-1, 1:-1], 5 + 3 + 5.0)
def test_piecewise2(instruction_set=instruction_set):
arr = np.zeros((20, 20))
@ps.kernel
def test_kernel(s):
f, g = ps.fields(f=arr, g=arr)
s.condition @= f[0, 0] > 1
s.result @= 0.0 if s.condition else 1.0
g[0, 0] @= s.result
ast = ps.create_kernel(test_kernel)
# ps.show_code(ast)
vectorize(ast, instruction_set=instruction_set)
# ps.show_code(ast)
func = ast.compile()
func(f=arr, g=arr)
np.testing.assert_equal(arr, np.ones_like(arr))
def test_piecewise3(instruction_set=instruction_set):
arr = np.zeros((22, 22))
@ps.kernel
def test_kernel(s):
f, g = ps.fields(f=arr, g=arr)
s.b @= f[0, 1]
g[0, 0] @= 1.0 / (s.b + s.k) if f[0, 0] > 0.0 else 1.0
ast = ps.create_kernel(test_kernel)
# ps.show_code(ast)
vectorize(ast, instruction_set=instruction_set)
# ps.show_code(ast)
ast.compile()
def test_logical_operators(instruction_set=instruction_set):
arr = np.zeros((22, 22))
@ps.kernel
def kernel_and(s):
f, g = ps.fields(f=arr, g=arr)
s.c @= sp.And(f[0, 1] < 0.0, f[1, 0] < 0.0)
g[0, 0] @= sp.Piecewise([1.0 / f[1, 0], s.c], [1.0, True])
ast = ps.create_kernel(kernel_and)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
@ps.kernel
def kernel_or(s):
f, g = ps.fields(f=arr, g=arr)
s.c @= sp.Or(f[0, 1] < 0.0, f[1, 0] < 0.0)
g[0, 0] @= sp.Piecewise([1.0 / f[1, 0], s.c], [1.0, True])
ast = ps.create_kernel(kernel_or)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
@ps.kernel
def kernel_equal(s):
f, g = ps.fields(f=arr, g=arr)
s.c @= sp.Eq(f[0, 1], 2.0)
g[0, 0] @= sp.Piecewise([1.0 / f[1, 0], s.c], [1.0, True])
ast = ps.create_kernel(kernel_equal)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
def test_hardware_query():
assert {'sse', 'neon', 'sve', 'sve2', 'sme', 'vsx', 'rvv'}.intersection(supported_instruction_sets)
def test_vectorised_pow(instruction_set=instruction_set):
arr = np.zeros((24, 24))
f, g = ps.fields(f=arr, g=arr)
as1 = ps.Assignment(g[0, 0], sp.Pow(f[0, 0], 2))
as2 = ps.Assignment(g[0, 0], sp.Pow(f[0, 0], 0.5))
as3 = ps.Assignment(g[0, 0], sp.Pow(f[0, 0], -0.5))
as4 = ps.Assignment(g[0, 0], sp.Pow(f[0, 0], 4))
as5 = ps.Assignment(g[0, 0], sp.Pow(f[0, 0], -4))
as6 = ps.Assignment(g[0, 0], sp.Pow(f[0, 0], -1))
ast = ps.create_kernel(as1)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
ast = ps.create_kernel(as2)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
ast = ps.create_kernel(as3)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
ast = ps.create_kernel(as4)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
ast = ps.create_kernel(as5)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
ast = ps.create_kernel(as6)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
def test_issue40(*_):
"""https://i10git.cs.fau.de/pycodegen/pystencils/-/issues/40"""
opt = {'instruction_set': "avx512", 'assume_aligned': False,
'nontemporal': False, 'assume_inner_stride_one': True}
src = ps.fields("src(1): double[2D]", layout='fzyx')
eq = [ps.Assignment(sp.Symbol('rho'), 1.0),
ps.Assignment(src[0, 0](0), sp.Rational(4, 9) * sp.Symbol('rho'))]
config = pystencils.config.CreateKernelConfig(target=Target.CPU, cpu_vectorize_info=opt, data_type='float64')
ast = ps.create_kernel(eq, config=config)
code = ps.get_code_str(ast)
assert 'epi32' not in code
import pytest
import numpy as np
import pystencils.config
import sympy as sp
import pystencils as ps
from pystencils.backends.simd_instruction_sets import (get_cacheline_size, get_supported_instruction_sets,
get_vector_instruction_set)
from pystencils.enums import Target
from pystencils.typing import CFunction
from . import test_vectorization
supported_instruction_sets = get_supported_instruction_sets() if get_supported_instruction_sets() else []
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
def test_vectorisation_varying_arch(instruction_set):
shape = (9, 9, 3)
arr = np.ones(shape, order='f')
@ps.kernel
def update_rule(s):
f = ps.fields("f(3) : [2D]", f=arr)
s.tmp0 @= f(0)
s.tmp1 @= f(1)
s.tmp2 @= f(2)
f0, f1, f2 = f(0), f(1), f(2)
f0 @= 2 * s.tmp0
f1 @= 2 * s.tmp0
f2 @= 2 * s.tmp0
config = pystencils.config.CreateKernelConfig(cpu_vectorize_info={'instruction_set': instruction_set})
ast = ps.create_kernel(update_rule, config=config)
kernel = ast.compile()
kernel(f=arr)
np.testing.assert_equal(arr, 2)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
def test_vectorized_abs_field(instruction_set, dtype):
"""Some instructions sets have abs, some don't.
Furthermore, the special treatment of unary minus makes this data type-sensitive too.
"""
arr = np.ones((2 ** 2 + 2, 2 ** 3 + 2), dtype=dtype)
arr[-3:, :] = -1
f, g = ps.fields(f=arr, g=arr)
update_rule = [ps.Assignment(g.center(), sp.Abs(f.center()))]
config = pystencils.config.CreateKernelConfig(cpu_vectorize_info={'instruction_set': instruction_set})
ast = ps.create_kernel(update_rule, config=config)
func = ast.compile()
dst = np.zeros_like(arr)
func(g=dst, f=arr)
np.testing.assert_equal(np.sum(dst[1:-1, 1:-1]), 2 ** 2 * 2 ** 3)
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
def test_vectorized_abs_scalar(instruction_set):
"""Some instructions sets have abs, some don't.
Furthermore, the special treatment of unary minus makes this data type-sensitive too.
"""
arr = np.zeros((2 ** 2 + 2, 2 ** 3 + 2), dtype="float64")
f = ps.fields(f=arr)
update_rule = [ps.Assignment(f.center(), sp.Abs(sp.Symbol("a")))]
config = pystencils.config.CreateKernelConfig(cpu_vectorize_info={'instruction_set': instruction_set})
ast = ps.create_kernel(update_rule, config=config)
func = ast.compile()
func(f=arr, a=-1)
np.testing.assert_equal(np.sum(arr[1:-1, 1:-1]), 2 ** 2 * 2 ** 3)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('nontemporal', [False, True])
def test_strided(instruction_set, dtype, nontemporal):
f, g = ps.fields(f"f, g : {dtype}[2D]")
update_rule = [ps.Assignment(g[0, 0], f[0, 0] + f[-1, 0] + f[1, 0] + f[0, 1] + f[0, -1] + 42.0)]
config = pystencils.config.CreateKernelConfig(cpu_vectorize_info={'instruction_set': instruction_set,
'nontemporal': nontemporal},
default_number_float=dtype)
if 'storeS' not in get_vector_instruction_set(dtype, instruction_set) \
and instruction_set not in ['avx512', 'avx512vl', 'rvv'] and not instruction_set.startswith('sve'):
with pytest.warns(UserWarning) as warn:
ast = ps.create_kernel(update_rule, config=config)
assert 'Could not vectorize loop' in warn[0].message.args[0]
else:
with pytest.warns(None) as warn:
ast = ps.create_kernel(update_rule, config=config)
assert len(warn) == 0
instruction = 'streamS' if nontemporal and 'streamS' in ast.instruction_set else 'storeS'
assert ast.instruction_set[instruction].split('{')[0] in ps.get_code_str(ast)
instruction = 'cachelineZero'
if instruction in ast.instruction_set:
assert ast.instruction_set[instruction] not in ps.get_code_str(ast)
# ps.show_code(ast)
func = ast.compile()
ref_config = pystencils.config.CreateKernelConfig(default_number_float=dtype)
ref_func = ps.create_kernel(update_rule, config=ref_config).compile()
# For some reason other array creations fail on the emulated ppc pipeline
size = (25, 19)
arr = np.zeros(size).astype(dtype)
for i in range(size[0]):
for j in range(size[1]):
arr[i, j] = i * j
dst = np.zeros_like(arr, dtype=dtype)
ref = np.zeros_like(arr, dtype=dtype)
func(g=dst, f=arr)
ref_func(g=ref, f=arr)
# print("dst: ", dst)
# print("np array: ", arr)
np.testing.assert_almost_equal(dst[1:-1, 1:-1], ref[1:-1, 1:-1], 13 if dtype == 'float64' else 5)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('gl_field, gl_kernel', [(1, 0), (0, 1), (1, 1)])
def test_alignment_and_correct_ghost_layers(gl_field, gl_kernel, instruction_set, dtype):
domain_size = (128, 128)
dh = ps.create_data_handling(domain_size, periodicity=(True, True), default_target=Target.CPU)
src = dh.add_array("src", values_per_cell=1, dtype=dtype, ghost_layers=gl_field, alignment=True)
dh.fill(src.name, 1.0, ghost_layers=True)
dst = dh.add_array("dst", values_per_cell=1, dtype=dtype, ghost_layers=gl_field, alignment=True)
dh.fill(dst.name, 1.0, ghost_layers=True)
update_rule = ps.Assignment(dst[0, 0], src[0, 0])
opt = {'instruction_set': instruction_set, 'assume_aligned': True,
'nontemporal': True, 'assume_inner_stride_one': True}
config = pystencils.config.CreateKernelConfig(target=dh.default_target,
cpu_vectorize_info=opt, ghost_layers=gl_kernel)
ast = ps.create_kernel(update_rule, config=config)
kernel = ast.compile()
if ('loadA' in ast.instruction_set or 'storeA' in ast.instruction_set) and gl_kernel != gl_field:
with pytest.raises(ValueError):
dh.run_kernel(kernel)
else:
dh.run_kernel(kernel)
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
def test_cacheline_size(instruction_set):
cacheline_size = get_cacheline_size(instruction_set)
if cacheline_size is None and instruction_set in ['sse', 'avx', 'avx512', 'avx512vl', 'rvv']:
pytest.skip()
instruction_set = get_vector_instruction_set('double', instruction_set)
vector_size = instruction_set['bytes']
assert 8 < cacheline_size < 0x100000, "Cache line size is implausible"
if type(vector_size) is int:
assert cacheline_size % vector_size == 0, "Cache line size should be multiple of vector size"
assert cacheline_size & (cacheline_size - 1) == 0, "Cache line size is not a power of 2"
# TODO move to vectorise
@pytest.mark.parametrize('instruction_set',
sorted(set(supported_instruction_sets) - {test_vectorization.instruction_set}))
@pytest.mark.parametrize('function',
[f for f in test_vectorization.__dict__ if f.startswith('test_') and f not in ['test_hardware_query', 'test_aligned_and_nt_stores']])
def test_vectorization_other(instruction_set, function):
test_vectorization.__dict__[function](instruction_set)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('field_layout', ('fzyx', 'zyxf'))
def test_square_root(dtype, instruction_set, field_layout):
config = pystencils.config.CreateKernelConfig(data_type=dtype,
default_number_float=dtype,
cpu_vectorize_info={'instruction_set': instruction_set,
'assume_inner_stride_one': True,
'assume_aligned': False,
'nontemporal': False})
src_field = ps.Field.create_generic('pdfs', 2, dtype, index_dimensions=1, layout=field_layout, index_shape=(9,))
eq = [ps.Assignment(sp.Symbol("xi"), sum(src_field.center_vector)),
ps.Assignment(sp.Symbol("xi_2"), sp.Symbol("xi") * sp.sqrt(src_field.center))]
ast = ps.create_kernel(eq, config=config)
ast.compile()
code = ps.get_code_str(ast)
print(code)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('padding', (True, False))
def test_square_root_2(dtype, instruction_set, padding):
x, y = sp.symbols("x y")
src = ps.fields(f"src: {dtype}[2D]", layout='fzyx')
up = ps.Assignment(src[0, 0], 1 / x + sp.sqrt(y * 0.52 + x ** 2))
cpu_vec = {'instruction_set': instruction_set, 'assume_inner_stride_one': True,
'assume_sufficient_line_padding': padding,
'assume_aligned': True}
config = ps.CreateKernelConfig(data_type=dtype, default_number_float=dtype, cpu_vectorize_info=cpu_vec)
ast = ps.create_kernel(up, config=config)
ast.compile()
code = ps.get_code_str(ast)
print(code)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('padding', (True, False))
def test_pow(dtype, instruction_set, padding):
config = pystencils.config.CreateKernelConfig(data_type=dtype,
default_number_float=dtype,
cpu_vectorize_info={'instruction_set': instruction_set,
'assume_inner_stride_one': True,
'assume_sufficient_line_padding': padding,
'assume_aligned': False, 'nontemporal': False})
src_field = ps.Field.create_generic('pdfs', 2, dtype, index_dimensions=1, layout='fzyx', index_shape=(9,))
eq = [ps.Assignment(sp.Symbol("xi"), sum(src_field.center_vector)),
ps.Assignment(sp.Symbol("xi_2"), sp.Symbol("xi") * sp.Pow(src_field.center, 0.5))]
ast = ps.create_kernel(eq, config=config)
ast.compile()
code = ps.get_code_str(ast)
print(code)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('padding', (True, False))
def test_issue62(dtype, instruction_set, padding):
opt = {'instruction_set': instruction_set, 'assume_aligned': True,
'assume_inner_stride_one': True,
'assume_sufficient_line_padding': padding}
dx = sp.Symbol("dx")
dy = sp.Symbol("dy")
src, dst, rhs = ps.fields(f"src, src_tmp, rhs: {dtype}[2D]", layout='fzyx')
up = ps.Assignment(dst[0, 0], ((dy ** 2 * (src[1, 0] + src[-1, 0]))
+ (dx ** 2 * (src[0, 1] + src[0, -1]))
- (rhs[0, 0] * dx ** 2 * dy ** 2)) / (2 * (dx ** 2 + dy ** 2)))
config = ps.CreateKernelConfig(data_type=dtype,
default_number_float=dtype,
cpu_vectorize_info=opt)
ast = ps.create_kernel(up, config=config)
ast.compile()
code = ps.get_code_str(ast)
print(code)
assert 'pow' not in code
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
def test_div_and_unevaluated_expr(dtype, instruction_set):
opt = {'instruction_set': instruction_set, 'assume_aligned': True, 'assume_inner_stride_one': True,
'assume_sufficient_line_padding': False}
x, y, z = sp.symbols("x y z")
rhs = (-4 * x ** 4 * y ** 2 * z ** 2 + (x ** 4 * y ** 2 / 3) + (x ** 4 * z ** 2 / 3)) / x ** 3
src = ps.fields(f"src: {dtype}[2D]", layout='fzyx')
up = ps.Assignment(src[0, 0], rhs)
config = ps.CreateKernelConfig(data_type=dtype,
default_number_float=dtype,
cpu_vectorize_info=opt)
ast = ps.create_kernel(up, config=config)
code = ps.get_code_str(ast)
# print(code)
ast.compile()
assert 'pow' not in code
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', ('sve', 'sve2', 'sme', 'rvv'))
def test_check_ast_parameters_sizeless(dtype, instruction_set):
f, g = ps.fields(f"f, g: {dtype}[3D]", layout='fzyx')
update_rule = [ps.Assignment(g.center(), 2 * f.center())]
config = pystencils.config.CreateKernelConfig(data_type=dtype,
cpu_vectorize_info={'instruction_set': instruction_set})
ast = ps.create_kernel(update_rule, config=config)
ast_symbols = [p.symbol for p in ast.get_parameters()]
assert ast.instruction_set['width'] not in ast_symbols
assert ast.instruction_set['intwidth'] not in ast_symbols
# TODO this test case needs a complete rework of the vectoriser. The reason is that the vectoriser does not
# TODO vectorise symbols at the moment because they could be strides or field sizes, thus involved in pointer arithmetic
# TODO This means that the vectoriser only works if fields are involved on the rhs.
# @pytest.mark.parametrize('dtype', ('float32', 'float64'))
# @pytest.mark.parametrize('instruction_set', supported_instruction_sets)
# def test_vectorised_symbols(dtype, instruction_set):
# opt = {'instruction_set': instruction_set, 'assume_aligned': True, 'assume_inner_stride_one': True,
# 'assume_sufficient_line_padding': False}
#
# x, y, z = sp.symbols("x y z")
# rhs = 1 / x ** 2 * (x + y)
#
# src = ps.fields(f"src: {dtype}[2D]", layout='fzyx')
#
# up = ps.Assignment(src[0, 0], rhs)
#
# config = ps.CreateKernelConfig(data_type=dtype,
# default_number_float=dtype,
# cpu_vectorize_info=opt)
#
# ast = ps.create_kernel(up, config=config)
# code = ps.get_code_str(ast)
# print(code)
#
# ast.compile()
#
# assert 'pow' not in code
import pystencils as ps
def test_version_string():
version = ps.__version__
print(version)
numeric_version = [int(x, 10) for x in version.split('.')[0:1]]
test_version = sum(x * (100 ** i) for i, x in enumerate(numeric_version))
assert test_version >= 1