Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 2192 additions and 53 deletions
import sympy as sp import sympy as sp
from sympy.abc import a, b, t, x, y, z
from sympy.printing.latex import LatexPrinter from sympy.printing.latex import LatexPrinter
import pystencils as ps
from pystencils.fd import * from pystencils.fd import *
def test_derivative_basic(): def test_derivative_basic():
x, y, z, t = sp.symbols("x y z t")
d = diff d = diff
op1, op2, op3 = DiffOperator(), DiffOperator(target=x), DiffOperator(target=x, superscript=1) op1, op2, op3 = DiffOperator(), DiffOperator(target=x), DiffOperator(target=x, superscript=1)
...@@ -18,4 +20,31 @@ def test_derivative_basic(): ...@@ -18,4 +20,31 @@ def test_derivative_basic():
assert diff_term == dx**2 + 2 * dx * dy + dy**2 + 1 assert diff_term == dx**2 + 2 * dx * dy + dy**2 + 1
assert DiffOperator.apply(diff_term, t) == d(t, x, x) + 2 * d(t, x, y) + d(t, y, y) + t assert DiffOperator.apply(diff_term, t) == d(t, x, x) + 2 * d(t, x, y) + d(t, y, y) + t
assert ps.fd.Diff(0) == 0
expr = ps.fd.diff(ps.fd.diff(x, 0, 0), 1, 1)
assert expr.get_arg_recursive() == x
assert expr.change_arg_recursive(y).get_arg_recursive() == y
def test_derivative_expand_collect():
original = Diff(x*y*z)
result = combine_diff_products(combine_diff_products(expand_diff_products(original))).expand()
assert original == result
original = -3 * y * z * Diff(x) + 2 * x * z * Diff(y)
result = expand_diff_products(combine_diff_products(original)).expand()
assert original == result
original = a + b * Diff(x ** 2 * y * z)
expanded = expand_diff_products(original)
collect_res = combine_diff_products(combine_diff_products(combine_diff_products(expanded)))
assert collect_res == original
def test_diff_expand_using_linearity():
eps = sp.symbols("epsilon")
funcs = [a, b]
test = Diff(eps * Diff(a+b))
result = expand_diff_linear(test, functions=funcs)
assert result == eps * Diff(Diff(a)) + eps * Diff(Diff(b))
%% Cell type:code id: tags:
``` python
from pystencils.session import *
from pystencils.astnodes import Block, Conditional, SympyAssignment
```
%% Cell type:code id: tags:
``` python
src, dst = ps.fields("src, dst: double[2D]", layout='c')
true_block = Block([SympyAssignment(dst[0, 0], src[-1, 0])])
false_block = Block([SympyAssignment(dst[0, 0], src[1, 0])])
ur = [true_block, Conditional(dst.center() > 0.0, true_block, false_block)]
ast = ps.create_kernel(ur)
```
%% Cell type:code id: tags:
``` python
ps.show_code(ast)
```
%% Output
%% Cell type:code id: tags:
``` python
```
import pystencils as ps
from pystencils.astnodes import Block, Conditional, SympyAssignment
def test_dot_print():
src, dst = ps.fields("src, dst: double[2D]", layout='c')
true_block = Block([SympyAssignment(dst[0, 0], src[-1, 0])])
false_block = Block([SympyAssignment(dst[0, 0], src[1, 0])])
ur = [true_block, Conditional(dst.center() > 0.0, true_block, false_block)]
ast = ps.create_kernel(ur)
import pystencils
import numpy as np import numpy as np
import pytest import pytest
import pystencils
def test_dtype_check_wrong_type(): def test_dtype_check_wrong_type():
array = np.ones((10, 20)).astype(np.float32) array = np.ones((10, 20)).astype(np.float32)
...@@ -15,7 +16,7 @@ def test_dtype_check_wrong_type(): ...@@ -15,7 +16,7 @@ def test_dtype_check_wrong_type():
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
kernel(x=array, y=output) kernel(x=array, y=output)
assert 'Wrong data type' in str(e) assert 'Wrong data type' in str(e.value)
def test_dtype_check_correct_type(): def test_dtype_check_correct_type():
......
import pytest
import sympy as sp import sympy as sp
import pystencils as ps import pystencils as ps
from pystencils.fast_approximation import insert_fast_divisions, insert_fast_sqrts, fast_sqrt, fast_inv_sqrt, \ from pystencils.fast_approximation import (
fast_division fast_division, fast_inv_sqrt, fast_sqrt, insert_fast_divisions, insert_fast_sqrts)
def test_fast_sqrt(): def test_fast_sqrt():
pytest.importorskip('cupy')
f, g = ps.fields("f, g: double[2D]") f, g = ps.fields("f, g: double[2D]")
expr = sp.sqrt(f[0, 0] + f[1, 0]) expr = sp.sqrt(f[0, 0] + f[1, 0])
assert len(insert_fast_sqrts(expr).atoms(fast_sqrt)) == 1 assert len(insert_fast_sqrts(expr).atoms(fast_sqrt)) == 1
assert len(insert_fast_sqrts([expr])[0].atoms(fast_sqrt)) == 1 assert len(insert_fast_sqrts([expr])[0].atoms(fast_sqrt)) == 1
ast_gpu = ps.create_kernel(ps.Assignment(g[0, 0], insert_fast_sqrts(expr)), target=ps.Target.GPU)
ast_gpu.compile()
code_str = ps.get_code_str(ast_gpu)
assert '__fsqrt_rn' in code_str
expr = 3 / sp.sqrt(f[0, 0] + f[1, 0]) expr = ps.Assignment(sp.Symbol("tmp"), 3 / sp.sqrt(f[0, 0] + f[1, 0]))
assert len(insert_fast_sqrts(expr).atoms(fast_inv_sqrt)) == 1 assert len(insert_fast_sqrts(expr).atoms(fast_inv_sqrt)) == 1
ac = ps.AssignmentCollection([expr], []) ac = ps.AssignmentCollection([expr], [])
assert len(insert_fast_sqrts(ac).main_assignments[0].atoms(fast_inv_sqrt)) == 1 assert len(insert_fast_sqrts(ac).main_assignments[0].atoms(fast_inv_sqrt)) == 1
ast_gpu = ps.create_kernel(insert_fast_sqrts(ac), target=ps.Target.GPU)
ast_gpu.compile()
code_str = ps.get_code_str(ast_gpu)
assert '__frsqrt_rn' in code_str
def test_fast_divisions(): def test_fast_divisions():
pytest.importorskip('cupy')
f, g = ps.fields("f, g: double[2D]") f, g = ps.fields("f, g: double[2D]")
expr = f[0, 0] / f[1, 0] expr = f[0, 0] / f[1, 0]
assert len(insert_fast_divisions(expr).atoms(fast_division)) == 1 assert len(insert_fast_divisions(expr).atoms(fast_division)) == 1
...@@ -25,6 +38,7 @@ def test_fast_divisions(): ...@@ -25,6 +38,7 @@ def test_fast_divisions():
expr = 1 / f[0, 0] * 2 / f[0, 1] expr = 1 / f[0, 0] * 2 / f[0, 1]
assert len(insert_fast_divisions(expr).atoms(fast_division)) == 1 assert len(insert_fast_divisions(expr).atoms(fast_division)) == 1
ast = ps.create_kernel(ps.Assignment(g[0, 0], insert_fast_divisions(expr)), target='gpu') ast = ps.create_kernel(ps.Assignment(g[0, 0], insert_fast_divisions(expr)), target=ps.Target.GPU)
code_str = str(ps.show_code(ast)) ast.compile()
code_str = ps.get_code_str(ast)
assert '__fdividef' in code_str assert '__fdividef' in code_str
%% Cell type:code id: tags:
``` python
from pystencils.session import *
from pystencils.fd.derivation import *
```
%% Cell type:markdown id: tags:
# 2D standard stencils
%% Cell type:code id: tags:
``` python
stencil = [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]
standard_2d_00 = FiniteDifferenceStencilDerivation((0,0), stencil)
f = ps.fields("f: [2D]")
standard_2d_00_res = standard_2d_00.get_stencil()
res = standard_2d_00_res.apply(f.center)
expected = f[-1, 0] - 2 * f[0, 0] + f[1, 0]
assert res == expected
```
%% Cell type:code id: tags:
``` python
assert standard_2d_00_res.accuracy == 2
assert not standard_2d_00_res.is_isotropic
standard_2d_00_res
```
%% Output
Finite difference stencil of accuracy 2, isotropic error: False
%% Cell type:code id: tags:
``` python
standard_2d_00.get_stencil().as_array()
```
%% Output
$\displaystyle \left[\begin{matrix}0 & 0 & 0\\1 & -2 & 1\\0 & 0 & 0\end{matrix}\right]$
[[0, 0, 0], [1, -2, 1], [0, 0, 0]]
%% Cell type:markdown id: tags:
# 2D isotropic stencils
## second x-derivative
%% Cell type:code id: tags:
``` python
stencil = [(i, j) for i in (-1, 0, 1) for j in (-1, 0, 1)]
isotropic_2d_00 = FiniteDifferenceStencilDerivation((0,0), stencil)
isotropic_2d_00_res = isotropic_2d_00.get_stencil(isotropic=True)
assert isotropic_2d_00_res.is_isotropic
assert isotropic_2d_00_res.accuracy == 2
isotropic_2d_00_res
```
%% Output
Finite difference stencil of accuracy 2, isotropic error: True
%% Cell type:code id: tags:
``` python
isotropic_2d_00_res.as_array()
```
%% Output
$\displaystyle \left[\begin{matrix}\frac{1}{12} & - \frac{1}{6} & \frac{1}{12}\\\frac{5}{6} & - \frac{5}{3} & \frac{5}{6}\\\frac{1}{12} & - \frac{1}{6} & \frac{1}{12}\end{matrix}\right]$
[[1/12, -1/6, 1/12], [5/6, -5/3, 5/6], [1/12, -1/6, 1/12]]
%% Cell type:code id: tags:
``` python
plt.figure(figsize=(2,2))
isotropic_2d_00_res.visualize()
```
%% Output
%% Cell type:code id: tags:
``` python
expected_result = sp.Array([[1, -2, 1], [10, -20, 10], [1, -2, 1]]) / 12
assert expected_result == isotropic_2d_00_res.as_array()
```
%% Cell type:markdown id: tags:
## Isotropic laplacian
%% Cell type:code id: tags:
``` python
isotropic_2d_11 = FiniteDifferenceStencilDerivation((1,1), stencil)
isotropic_2d_11_res = isotropic_2d_11.get_stencil(isotropic=True)
iso_laplacian = isotropic_2d_00_res.as_array() + isotropic_2d_11_res.as_array()
iso_laplacian
```
%% Output
$\displaystyle \left[\begin{matrix}\frac{1}{6} & \frac{2}{3} & \frac{1}{6}\\\frac{2}{3} & - \frac{10}{3} & \frac{2}{3}\\\frac{1}{6} & \frac{2}{3} & \frac{1}{6}\end{matrix}\right]$
[[1/6, 2/3, 1/6], [2/3, -10/3, 2/3], [1/6, 2/3, 1/6]]
%% Cell type:code id: tags:
``` python
expected_result = sp.Array([[1, 4, 1], [4, -20, 4], [1, 4, 1]]) / 6
assert iso_laplacian == expected_result
```
%% Cell type:markdown id: tags:
# stencils for staggered fields
%% Cell type:code id: tags:
``` python
half = sp.Rational(1, 2)
fd_points_ex = (
(half, 0),
(-half, 0),
(half, 1),
(half, -1),
(-half, 1),
(-half, -1)
)
assert set(fd_points_ex) == set(FiniteDifferenceStaggeredStencilDerivation("E", 2).stencil)
fd_points_ey = (
(0, half),
(0, -half),
(-1,-half),
(-1, half),
(1, -half),
(1, half)
)
assert set(fd_points_ey) == set(FiniteDifferenceStaggeredStencilDerivation("N",2).stencil)
fd_points_c = (
(half, half),
(-half, -half),
(half, -half),
(-half, half)
)
assert set(fd_points_c) == set(FiniteDifferenceStaggeredStencilDerivation("NE",2).stencil)
assert len(FiniteDifferenceStaggeredStencilDerivation("E",3).points) == 10
assert len(FiniteDifferenceStaggeredStencilDerivation("NE",3).points) == 12
assert len(FiniteDifferenceStaggeredStencilDerivation("TNE",3).points) == 8
```
%% Cell type:code id: tags:
``` python
c = ps.fields("c: [2D]")
c3 = ps.fields("c3: [3D]")
assert FiniteDifferenceStaggeredStencilDerivation("E", 2, (0,)).apply(c.center) == c[1, 0] - c[0, 0]
assert FiniteDifferenceStaggeredStencilDerivation("W", 2, (0,)).apply(c.center) == c[0, 0] - c[-1, 0]
assert FiniteDifferenceStaggeredStencilDerivation("N", 2, (1,)).apply(c.center) == c[0, 1] - c[0, 0]
assert FiniteDifferenceStaggeredStencilDerivation("S", 2, (1,)).apply(c.center) == c[0, 0] - c[0, -1]
assert FiniteDifferenceStaggeredStencilDerivation("E", 3, (0,)).apply(c3.center) == c3[1, 0, 0] - c3[0, 0, 0]
assert FiniteDifferenceStaggeredStencilDerivation("W", 3, (0,)).apply(c3.center) == c3[0, 0, 0] - c3[-1, 0, 0]
assert FiniteDifferenceStaggeredStencilDerivation("N", 3, (1,)).apply(c3.center) == c3[0, 1, 0] - c3[0, 0, 0]
assert FiniteDifferenceStaggeredStencilDerivation("S", 3, (1,)).apply(c3.center) == c3[0, 0, 0] - c3[0, -1, 0]
assert FiniteDifferenceStaggeredStencilDerivation("T", 3, (2,)).apply(c3.center) == c3[0, 0, 1] - c3[0, 0, 0]
assert FiniteDifferenceStaggeredStencilDerivation("B", 3, (2,)).apply(c3.center) == c3[0, 0, 0] - c3[0, 0, -1]
assert FiniteDifferenceStaggeredStencilDerivation("S", 2, (0,)).apply(c.center) == \
(c[1, 0] + c[1, -1] - c[-1, 0] - c[-1, -1])/4
assert FiniteDifferenceStaggeredStencilDerivation("NE", 2, (0,)).apply(c.center) + \
FiniteDifferenceStaggeredStencilDerivation("NE", 2, (1,)).apply(c.center) == c[1, 1] - c[0, 0]
assert FiniteDifferenceStaggeredStencilDerivation("NE", 3, (0,)).apply(c3.center) + \
FiniteDifferenceStaggeredStencilDerivation("NE", 3, (1,)).apply(c3.center) == c3[1, 1, 0] - c3[0, 0, 0]
```
%% Cell type:code id: tags:
``` python
d = FiniteDifferenceStaggeredStencilDerivation("NE", 2, (0, 1))
assert d.apply(c.center) == c[0,0] + c[1,1] - c[1,0] - c[0,1]
d.visualize()
```
%% Output
%% Cell type:code id: tags:
``` python
v3 = ps.fields("v(3): [3D]")
for i in range(*v3.index_shape):
assert FiniteDifferenceStaggeredStencilDerivation("E", 3, (0,)).apply(v3.center_vector[i]) == \
v3[1,0,0](i) - v3[0,0,0](i)
```
%% Cell type:code id: tags:
``` python
from pystencils.session import *
from pystencils.fd.derivation import *
```
%% Cell type:markdown id: tags:
# 2D isotropic stencils
%% Cell type:code id: tags:
``` python
stencil_2D = ((0, 0),
(0, 1), (0, -1), (-1, 0), (1, 0),
(-1, 1), (1, 1), (-1, -1), (1, -1))
f = ps.fields("f: [2D]")
```
%% Cell type:code id: tags:
``` python
Isotropic_Gradient = list()
deriv_x = FiniteDifferenceStencilDerivation((0, ), stencil_2D)
deriv_x.assume_symmetric(0, anti_symmetric=True)
deriv_x.assume_symmetric(1, anti_symmetric=False)
deriv_x.set_weight((0, 0), sp.Integer(0))
deriv_x.set_weight((1, 0), sp.Rational(1, 3))
deriv_x.set_weight((1, 1), sp.Rational(1, 12))
res_x = deriv_x.get_stencil(isotropic=True)
```
%% Cell type:code id: tags:
``` python
Isotropic_Gradient = list()
deriv_y = FiniteDifferenceStencilDerivation((1, ), stencil_2D)
deriv_y.assume_symmetric(0, anti_symmetric=False)
deriv_y.assume_symmetric(1, anti_symmetric=True)
deriv_y.set_weight((0, 0), sp.Integer(0))
deriv_y.set_weight((0, 1), sp.Rational(1, 3))
deriv_y.set_weight((1, 1), sp.Rational(1, 12))
res_y = deriv_y.get_stencil(isotropic=True)
```
%% Cell type:code id: tags:
``` python
assert res_x.apply(f.center) - res_y.rotate_weights_and_apply(f.center, (1, 0)) == 0
assert res_y.apply(f.center) - res_x.rotate_weights_and_apply(f.center, (0, 1)) == 0
```
%% Cell type:markdown id: tags:
# 3D isotropic stencils
%% Cell type:code id: tags:
``` python
stencil_3D = ((0, 0, 0),
(0, 1, 0), (0, -1, 0), (-1, 0, 0), (1, 0, 0), (0, 0, 1), (0, 0, -1),
(-1, 1, 0), (1, 1, 0), (-1, -1, 0), (1, -1, 0),
(0, 1, 1), (0, -1, 1), (-1, 0, 1), (1, 0, 1),
(0, 1, -1), (0, -1, -1), (-1, 0, -1), (1, 0, -1),
(1, 1, 1), (-1, 1, 1), (1, -1, 1), (-1, -1, 1), (1, 1, -1), (-1, 1, -1), (1, -1, -1),
(-1, -1, -1))
f = ps.fields("f: [3D]")
```
%% Cell type:code id: tags:
``` python
deriv_x = FiniteDifferenceStencilDerivation((0, ), stencil_3D)
deriv_x.assume_symmetric(0, anti_symmetric=True)
deriv_x.assume_symmetric(1, anti_symmetric=False)
deriv_x.assume_symmetric(2, anti_symmetric=False)
deriv_x.set_weight((0, 0, 0), sp.Integer(0))
deriv_x.set_weight((1, 1, 1), sp.Rational(1, 3360))
res_x = deriv_x.get_stencil(isotropic=True)
```
%% Cell type:code id: tags:
``` python
deriv_y = FiniteDifferenceStencilDerivation((1, ), stencil_3D)
deriv_y.assume_symmetric(0, anti_symmetric=False)
deriv_y.assume_symmetric(1, anti_symmetric=True)
deriv_y.assume_symmetric(2, anti_symmetric=False)
deriv_y.set_weight((0, 0, 0), sp.Integer(0))
deriv_y.set_weight((1, 1, 1), sp.Rational(1, 3360))
res_y = deriv_y.get_stencil(isotropic=True)
```
%% Cell type:code id: tags:
``` python
deriv_z = FiniteDifferenceStencilDerivation((2, ), stencil_3D)
deriv_z.assume_symmetric(0, anti_symmetric=False)
deriv_z.assume_symmetric(1, anti_symmetric=False)
deriv_z.assume_symmetric(2, anti_symmetric=True)
deriv_z.set_weight((0, 0, 0), sp.Integer(0))
deriv_z.set_weight((1, 1, 1), sp.Rational(1, 3360))
res_z = deriv_z.get_stencil(isotropic=True)
```
%% Cell type:code id: tags:
``` python
assert res_x.apply(f.center) - res_y.rotate_weights_and_apply(f.center, (1, 0)) == 0
assert res_x.apply(f.center) - res_z.rotate_weights_and_apply(f.center, (2, 1)) == 0
assert res_y.apply(f.center) - res_x.rotate_weights_and_apply(f.center, (0, 1)) == 0
assert res_y.apply(f.center) - res_z.rotate_weights_and_apply(f.center, (0, 2)) == 0
assert res_z.apply(f.center) - res_x.rotate_weights_and_apply(f.center, (1, 2)) == 0
assert res_z.apply(f.center) - res_y.rotate_weights_and_apply(f.center, (2, 0)) == 0
```
import sympy as sp
from pystencils import fields
from pystencils.fd import Diff, diff, collect_diffs
from pystencils.fd.derivative import replace_generic_laplacian
def test_fs():
f = sp.Symbol("f", commutative=False)
a = Diff(Diff(Diff(f, 1), 0), 0)
assert a.is_commutative is False
print(str(a))
assert diff(f) == f
x, y = sp.symbols("x, y")
collected_terms = collect_diffs(diff(x, 0, 0))
assert collected_terms == Diff(Diff(x, 0, -1), 0, -1)
src = fields("src : double[2D]")
expr = sp.Add(Diff(Diff(src[0, 0])), 10)
expected = Diff(Diff(src[0, 0], 0, -1), 0, -1) + Diff(Diff(src[0, 0], 1, -1), 1, -1) + 10
result = replace_generic_laplacian(expr, 3)
assert result == expected
\ No newline at end of file
import numpy as np
import pytest
import sympy as sp
import pystencils as ps
from pystencils import TypedSymbol
from pystencils.typing import create_type
from pystencils.field import Field, FieldType, layout_string_to_tuple, spatial_layout_string_to_tuple
def test_field_basic():
f = Field.create_generic("f", spatial_dimensions=2)
assert FieldType.is_generic(f)
assert f["E"] == f[1, 0]
assert f["N"] == f[0, 1]
assert "_" in f.center._latex("dummy")
assert (
f.index_to_physical(index_coordinates=sp.Matrix([0, 0]), staggered=False)[0]
== 0
)
assert (
f.index_to_physical(index_coordinates=sp.Matrix([0, 0]), staggered=False)[1]
== 0
)
assert (
f.physical_to_index(physical_coordinates=sp.Matrix([0, 0]), staggered=False)[0]
== 0
)
assert (
f.physical_to_index(physical_coordinates=sp.Matrix([0, 0]), staggered=False)[1]
== 0
)
f1 = f.new_field_with_different_name("f1")
assert f1.ndim == f.ndim
assert f1.values_per_cell() == f.values_per_cell()
fixed = ps.fields("f(5, 5) : double[20, 20]")
assert fixed.neighbor_vector((1, 1)).shape == (5, 5)
f = Field.create_fixed_size("f", (10, 10), strides=(80, 8), dtype=np.float64)
assert f.spatial_strides == (10, 1)
assert f.index_strides == ()
assert f.center_vector == sp.Matrix([f.center])
f1 = f.new_field_with_different_name("f1")
assert f1.ndim == f.ndim
assert f1.values_per_cell() == f.values_per_cell()
f = Field.create_fixed_size("f", (8, 8, 2, 2), index_dimensions=2)
assert f.center_vector == sp.Matrix([[f(0, 0), f(0, 1)], [f(1, 0), f(1, 1)]])
field_access = f[1, 1]
assert field_access.nr_of_coordinates == 2
assert field_access.offset_name == "NE"
neighbor = field_access.neighbor(coord_id=0, offset=-2)
assert neighbor.offsets == (-1, 1)
assert "_" in neighbor._latex("dummy")
f = Field.create_fixed_size("f", (8, 8, 2, 2, 2), index_dimensions=3)
assert f.center_vector == sp.Array(
[[[f(i, j, k) for k in range(2)] for j in range(2)] for i in range(2)]
)
f = Field.create_generic("f", spatial_dimensions=5, index_dimensions=2)
field_access = f[1, -1, 2, -3, 0](1, 0)
assert field_access.offsets == (1, -1, 2, -3, 0)
assert field_access.index == (1, 0)
def test_error_handling():
struct_dtype = np.dtype([('a', np.int32), ('b', np.float64), ('c', np.uint32)])
Field.create_generic('f', spatial_dimensions=2, index_dimensions=0, dtype=struct_dtype)
with pytest.raises(ValueError) as e:
Field.create_generic(
"f", spatial_dimensions=2, index_dimensions=1, dtype=struct_dtype
)
assert "index dimension" in str(e.value)
arr = np.array([[[(1,) * 3, (2,) * 3, (3,) * 3]] * 2], dtype=struct_dtype)
Field.create_from_numpy_array("f", arr, index_dimensions=0)
with pytest.raises(ValueError) as e:
Field.create_from_numpy_array("f", arr, index_dimensions=1)
assert "Structured arrays" in str(e.value)
arr = np.zeros([3, 3, 3])
Field.create_from_numpy_array("f", arr, index_dimensions=2)
with pytest.raises(ValueError) as e:
Field.create_from_numpy_array("f", arr, index_dimensions=3)
assert "Too many" in str(e.value)
Field.create_fixed_size(
"f", (3, 2, 4), index_dimensions=0, dtype=struct_dtype, layout="reverse_numpy"
)
with pytest.raises(ValueError) as e:
Field.create_fixed_size(
"f",
(3, 2, 4),
index_dimensions=1,
dtype=struct_dtype,
layout="reverse_numpy",
)
assert "Structured arrays" in str(e.value)
f = Field.create_fixed_size("f", (10, 10))
with pytest.raises(ValueError) as e:
f[1]
assert "Wrong number of spatial indices" in str(e.value)
f = Field.create_generic("f", spatial_dimensions=2, index_shape=(3,))
with pytest.raises(ValueError) as e:
f(3)
assert "out of bounds" in str(e.value)
f = Field.create_fixed_size("f", (10, 10, 3, 4), index_dimensions=2)
with pytest.raises(ValueError) as e:
f(3, 0)
assert "out of bounds" in str(e.value)
with pytest.raises(ValueError) as e:
f(1, 0)(1, 0)
assert "Indexing an already indexed" in str(e.value)
with pytest.raises(ValueError) as e:
f(1)
assert "Wrong number of indices" in str(e.value)
with pytest.raises(ValueError) as e:
Field.create_generic("f", spatial_dimensions=2, layout="wrong")
assert "Unknown layout descriptor" in str(e.value)
assert layout_string_to_tuple("fzyx", dim=4) == (3, 2, 1, 0)
with pytest.raises(ValueError) as e:
layout_string_to_tuple("wrong", dim=4)
assert "Unknown layout descriptor" in str(e.value)
def test_decorator_scoping():
dst = ps.fields("dst : double[2D]")
def f1():
a = sp.Symbol("a")
def f2():
b = sp.Symbol("b")
@ps.kernel
def decorated_func():
dst[0, 0] @= a + b
return decorated_func
return f2
assert f1()(), ps.Assignment(dst[0, 0], sp.Symbol("a") + sp.Symbol("b"))
def test_string_creation():
x, y, z = ps.fields(" x(4), y(3,5) z : double[ 3, 47]")
assert x.index_shape == (4,)
assert y.index_shape == (3, 5)
assert z.spatial_shape == (3, 47)
def test_itemsize():
x = ps.fields("x: float32[1d]")
y = ps.fields("y: float64[2d]")
i = ps.fields("i: int16[1d]")
assert x.itemsize == 4
assert y.itemsize == 8
assert i.itemsize == 2
def test_spatial_memory_layout_descriptors():
assert (
spatial_layout_string_to_tuple("AoS", 3)
== spatial_layout_string_to_tuple("aos", 3)
== spatial_layout_string_to_tuple("ZYXF", 3)
== spatial_layout_string_to_tuple("zyxf", 3)
== (2, 1, 0)
)
assert (
spatial_layout_string_to_tuple("SoA", 3)
== spatial_layout_string_to_tuple("soa", 3)
== spatial_layout_string_to_tuple("FZYX", 3)
== spatial_layout_string_to_tuple("fzyx", 3)
== spatial_layout_string_to_tuple("f", 3)
== spatial_layout_string_to_tuple("F", 3)
== (2, 1, 0)
)
assert (
spatial_layout_string_to_tuple("c", 3)
== spatial_layout_string_to_tuple("C", 3)
== (0, 1, 2)
)
assert spatial_layout_string_to_tuple("C", 5) == (0, 1, 2, 3, 4)
with pytest.raises(ValueError):
spatial_layout_string_to_tuple("aos", -1)
with pytest.raises(ValueError):
spatial_layout_string_to_tuple("aos", 4)
def test_memory_layout_descriptors():
assert (
layout_string_to_tuple("AoS", 4)
== layout_string_to_tuple("aos", 4)
== layout_string_to_tuple("ZYXF", 4)
== layout_string_to_tuple("zyxf", 4)
== (2, 1, 0, 3)
)
assert (
layout_string_to_tuple("SoA", 4)
== layout_string_to_tuple("soa", 4)
== layout_string_to_tuple("FZYX", 4)
== layout_string_to_tuple("fzyx", 4)
== layout_string_to_tuple("f", 4)
== layout_string_to_tuple("F", 4)
== (3, 2, 1, 0)
)
assert (
layout_string_to_tuple("c", 4)
== layout_string_to_tuple("C", 4)
== (0, 1, 2, 3)
)
assert layout_string_to_tuple("C", 5) == (0, 1, 2, 3, 4)
with pytest.raises(ValueError):
layout_string_to_tuple("aos", -1)
with pytest.raises(ValueError):
layout_string_to_tuple("aos", 5)
def test_staggered():
# D2Q5
j1, j2, j3 = ps.fields(
"j1(2), j2(2,2), j3(2,2,2) : double[2D]", field_type=FieldType.STAGGERED
)
assert j1[0, 1](1) == j1.staggered_access((0, sp.Rational(1, 2)))
assert j1[0, 1](1) == j1.staggered_access(np.array((0, sp.Rational(1, 2))))
assert j1[1, 1](1) == j1.staggered_access((1, sp.Rational(1, 2)))
assert j1[0, 2](1) == j1.staggered_access((0, sp.Rational(3, 2)))
assert j1[0, 1](1) == j1.staggered_access("N")
assert j1[0, 0](1) == j1.staggered_access("S")
assert j1.staggered_vector_access("N") == sp.Matrix([j1.staggered_access("N")])
assert j1.staggered_stencil_name == "D2Q5"
assert j1.physical_coordinates[0] == TypedSymbol("ctr_0", create_type("int"), nonnegative=True)
assert j1.physical_coordinates[1] == TypedSymbol("ctr_1", create_type("int"), nonnegative=True)
assert j1.physical_coordinates_staggered[0] == TypedSymbol("ctr_0", create_type("int"), nonnegative=True) + 0.5
assert j1.physical_coordinates_staggered[1] == TypedSymbol("ctr_1", create_type("int"), nonnegative=True) + 0.5
assert j1.index_to_physical(index_coordinates=sp.Matrix([0, 0]), staggered=True)[0] == 0.5
assert j1.index_to_physical(index_coordinates=sp.Matrix([0, 0]), staggered=True)[1] == 0.5
assert j1.physical_to_index(physical_coordinates=sp.Matrix([0, 0]), staggered=True)[0] == -0.5
assert j1.physical_to_index(physical_coordinates=sp.Matrix([0, 0]), staggered=True)[1] == -0.5
assert j2[0, 1](1, 1) == j2.staggered_access((0, sp.Rational(1, 2)), 1)
assert j2[0, 1](1, 1) == j2.staggered_access("N", 1)
assert j2.staggered_vector_access("N") == sp.Matrix(
[j2.staggered_access("N", 0), j2.staggered_access("N", 1)]
)
assert j3[0, 1](1, 1, 1) == j3.staggered_access((0, sp.Rational(1, 2)), (1, 1))
assert j3[0, 1](1, 1, 1) == j3.staggered_access("N", (1, 1))
assert j3.staggered_vector_access("N") == sp.Matrix(
[[j3.staggered_access("N", (i, j)) for j in range(2)] for i in range(2)]
)
# D2Q9
k1, k2 = ps.fields("k1(4), k2(2) : double[2D]", field_type=FieldType.STAGGERED)
assert k1[1, 1](2) == k1.staggered_access("NE")
assert k1[0, 0](2) == k1.staggered_access("SW")
assert k1[0, 0](3) == k1.staggered_access("NW")
a = k1.staggered_access("NE")
assert a._staggered_offset(a.offsets, a.index[0]) == [
sp.Rational(1, 2),
sp.Rational(1, 2),
]
a = k1.staggered_access("SW")
assert a._staggered_offset(a.offsets, a.index[0]) == [
sp.Rational(-1, 2),
sp.Rational(-1, 2),
]
a = k1.staggered_access("NW")
assert a._staggered_offset(a.offsets, a.index[0]) == [
sp.Rational(-1, 2),
sp.Rational(1, 2),
]
# sign reversed when using as flux field
r = ps.fields("r(2) : double[2D]", field_type=FieldType.STAGGERED_FLUX)
assert r[0, 0](0) == r.staggered_access("W")
assert -r[1, 0](0) == r.staggered_access("E")
#
# Copyright © 2020 Stephan Seitz <stephan.seitz@fau.de>
#
# Distributed under terms of the GPLv3 license.
"""
"""
import pytest
from pystencils.session import *
from sympy import poly
def test_field_access_poly():
dh = ps.create_data_handling((20, 20))
ρ = dh.add_array('rho')
rho = ρ.center
a = poly(rho+0.5, rho)
print(a)
def test_field_access_piecewise():
try:
a = sp.Piecewise((0, 1 < sp.Max(-0.5, sp.Symbol("test") + 0.5)), (1, True))
a.simplify()
except Exception as e:
pytest.skip(f"Bug in SymPy 1.10: {e}")
else:
dh = ps.create_data_handling((20, 20))
ρ = dh.add_array('rho')
pw = sp.Piecewise((0, 1 < sp.Max(-0.5, ρ.center+0.5)), (1, True))
a = sp.simplify(pw)
print(a)
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
from pystencils.session import * from pystencils.session import *
from pystencils.data_types import cast_func
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Test field equality behaviour ## Test field equality behaviour
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
Fields create with same parameters are equal Fields create with same parameters are equal
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
f1 = ps.Field.create_generic('f', spatial_dimensions=2, index_dimensions=0) f1 = ps.Field.create_generic('f', spatial_dimensions=2, index_dimensions=0)
f2 = ps.Field.create_generic('f', spatial_dimensions=2, index_dimensions=0) f2 = ps.Field.create_generic('f', spatial_dimensions=2, index_dimensions=0)
assert f1 == f2 assert f1 == f2
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
print("Field ids equal in accesses: ", id(f1.center._field) == id(f2.center._field)) print("Field ids equal in accesses: ", id(f1.center._field) == id(f2.center._field))
print("Field accesses equal: ", f1.center == f2.center) print("Field accesses equal: ", f1.center == f2.center)
``` ```
%% Output %% Output
Field ids equal in accesses: True Field ids equal in accesses: True
Field accesses equal: True Field accesses equal: True
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
f1 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0) f1 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0)
f2 = ps.Field.create_generic('f', spatial_dimensions=2, index_dimensions=0) f2 = ps.Field.create_generic('f', spatial_dimensions=2, index_dimensions=0)
assert f1 != f2 assert f1 != f2
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
f1 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0) f1 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0)
f2 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0, dtype=np.float32) f2 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0, dtype=np.float32)
assert f1 != f2 assert f1 != f2
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
Properties of fields: Properties of fields:
- `field_type`: enum distinguishing normal from index or buffer fields - `field_type`: enum distinguishing normal from index or buffer fields
- `_dtype`: data type of field elements - `_dtype`: data type of field elements
- `_layout`: tuple indicating the memory linearization order - `_layout`: tuple indicating the memory linearization order
- `shape`: size of field for each dimension - `shape`: size of field for each dimension
- `strides`: number of elements to jump over to increase coordinate of this dimension by one - `strides`: number of elements to jump over to increase coordinate of this dimension by one
- `latex_name`: optional display name when field is printed as latex - `latex_name`: optional display name when field is printed as latex
Equality compare of fields: Equality compare of fields:
- field has `__eq__` and ``__hash__`` overridden - field has `__eq__` and ``__hash__`` overridden
- all parameter but `latex_name` are considered for equality - all parameter but `latex_name` are considered for equality
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Test field access equality behaviour ## Test field access equality behaviour
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
f1 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0) f1 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0)
f2 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0) f2 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0)
assert f1.center == f2.center assert f1.center == f2.center
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
f1 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0) f1 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0)
f2 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0, dtype=np.float32) f2 = ps.Field.create_generic('f', spatial_dimensions=1, index_dimensions=0, dtype=np.float32)
assert f1.center != f2.center assert f1.center != f2.center
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
def print_field_accesses_debug(expr): def print_field_accesses_debug(expr):
from pystencils import Field from pystencils import Field
fas = list(expr.atoms(Field.Access)) fas = list(expr.atoms(Field.Access))
fields = list({e.field for e in fas}) fields = list({e.field for e in fas})
print("Field Accesses:") print("Field Accesses:")
for fa in fas: for fa in fas:
print(f" - {fa}, hash {hash(fa)}, offsets {fa._offsets}, index {fa._index}, {fa._hashable_content()}") print(f" - {fa}, hash {hash(fa)}, offsets {fa._offsets}, index {fa._index}, {fa._hashable_content()}")
print("") print("")
for i in range(len(fas)): for i in range(len(fas)):
for j in range(len(fas)): for j in range(len(fas)):
if not i < j: if not i < j:
continue continue
print( f" -> {i},{j} {fas[i]} == {fas[j]}: {fas[i] == {fas[j]}}") print( f" -> {i},{j} {fas[i]} == {fas[j]}: {fas[i] == {fas[j]}}")
print("Fields") print("Fields")
for f in fields: for f in fields:
print(f" - {f}, {id(f)}, shape {f.shape}, strides {f.strides}, {f._dtype}, {f.field_type}, layout {f.layout}") print(f" - {f}, {id(f)}, shape {f.shape}, strides {f.strides}, {f._dtype}, {f.field_type}, layout {f.layout}")
print("") print("")
for i in range(len(fields)): for i in range(len(fields)):
for j in range(len(fields)): for j in range(len(fields)):
if not i < j: if not i < j:
continue continue
print(f" - {fields[i]} == {fields[j]}: {fields[i] == fields[j]}, ids equal {id(fields[i])==id(fields[j])}, hash equal {hash(fields[i])==hash(fields[j])}") print(f" - {fields[i]} == {fields[j]}: {fields[i] == fields[j]}, ids equal {id(fields[i])==id(fields[j])}, hash equal {hash(fields[i])==hash(fields[j])}")
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
print_field_accesses_debug(f1.center * f2.center) print_field_accesses_debug(f1.center * f2.center)
``` ```
%% Output %% Output
Field Accesses: Field Accesses:
- f[0], hash 2177071761647211096, offsets (0,), index (), (('f_C', ('commutative', True)), ((0,), (fshape_f[0],), (fstride_f[0],), -2638709558778433189, <FieldType.GENERIC: 0>, 'f'), 0) - f[0], hash -8859424145258271267, offsets (0,), index (), ((('f_C', ('commutative', True), ('complex', True), ('extended_real', True), ('finite', True), ('hermitian', True), ('imaginary', False), ('infinite', False), ('real', True)), 2305067722319023373), ((0,), (_size_f_0,), (_stride_f_0,), <FieldType.GENERIC: 0>, 'f', None, double), 0)
- f[0], hash -219035921004479174, offsets (0,), index (), (('f_C', ('commutative', True)), ((0,), (fshape_f[0],), (fstride_f[0],), 1379426851108887372, <FieldType.GENERIC: 0>, 'f'), 0) - f[0], hash -6454673863007224785, offsets (0,), index (), ((('f_C', ('commutative', True), ('complex', True), ('extended_real', True), ('finite', True), ('hermitian', True), ('imaginary', False), ('infinite', False), ('real', True)), 4093629613697528859), ((0,), (_size_f_0,), (_stride_f_0,), <FieldType.GENERIC: 0>, 'f', None, float), 0)
-> 0,1 f[0] == f[0]: False -> 0,1 f[0] == f[0]: False
Fields Fields
- f, 139911303819560, shape (fshape_f[0],), strides (fstride_f[0],), double, FieldType.GENERIC, layout (0,) - f, 4881406800, shape (_size_f_0,), strides (_stride_f_0,), double, FieldType.GENERIC, layout (0,)
- f, 139911303820008, shape (fshape_f[0],), strides (fstride_f[0],), float, FieldType.GENERIC, layout (0,) - f, 4881445024, shape (_size_f_0,), strides (_stride_f_0,), float, FieldType.GENERIC, layout (0,)
- f == f: False, ids equal False, hash equal False - f == f: False, ids equal False, hash equal False
%% Cell type:markdown id: tags:
## Custom fields
%% Cell type:code id: tags:
``` python
from lbmpy.sparse.update_rule_sparse import *
```
%% Cell type:code id: tags:
``` python
list_field = create_symbolic_list('l', 10, 2, np.float64)
normal_field = ps.fields("f: [2D]")
normal_field.field_type = ps.FieldType.CUSTOM
t1 = normal_field.absolute_access( (list_field[1](0),), (1,))
t2 = normal_field.absolute_access( (list_field[1](1),), (1,))
t1 + t2
```
%% Output
$${{f}_{\mathbf{{l}_{1}^{1}}}^{1}} + {{f}_{\mathbf{{l}_{1}}}^{1}}$$
f_000035D373 + f_000035D373
......
import sympy as sp
import pytest
import pystencils as ps
from pystencils.astnodes import LoopOverCoordinate
from pystencils.fd import diff, diffusion, Discretization2ndOrder
from pystencils.fd.spatial import discretize_spatial, fd_stencils_isotropic, fd_stencils_standard, \
fd_stencils_forth_order_isotropic
def test_spatial_2d_unit_sum():
f = ps.fields("f: double[2D]")
h = sp.symbols("h")
terms = [diff(f, 0),
diff(f, 1),
diff(f, 0, 1),
diff(f, 0, 0),
diff(f, 1, 1),
diff(f, 0, 0) + diff(f, 1, 1)]
schemes = [fd_stencils_standard, fd_stencils_isotropic, 'standard', 'isotropic']
for term in terms:
for scheme in schemes:
discretized = discretize_spatial(term, dx=h, stencil=scheme)
_, coefficients = ps.stencil.coefficients(discretized)
assert sum(coefficients) == 0
def test_spatial_1d_unit_sum():
f = ps.fields("f: double[1D]")
h = sp.symbols("h")
terms = [diff(f, 0),
diff(f, 0, 0)]
schemes = [fd_stencils_standard, fd_stencils_isotropic, 'standard', 'isotropic']
for term in terms:
for scheme in schemes:
discretized = discretize_spatial(term, dx=h, stencil=scheme)
_, coefficients = ps.stencil.coefficients(discretized)
assert sum(coefficients) == 0
def test_fd_stencils_forth_order_isotropic():
f = ps.fields("f: double[2D]")
a = fd_stencils_forth_order_isotropic([0], 1, f[0, 0](0))
sten, coefficients = ps.stencil.coefficients(a)
assert sum(coefficients) == 0
for i, direction in enumerate(sten):
counterpart = sten.index((direction[0] * -1, direction[1] * -1))
assert coefficients[i] + coefficients[counterpart] == 0
def test_staggered_laplacian():
f = ps.fields("f : double[2D]")
a, dx = sp.symbols("a, dx")
factored_version = sum(ps.fd.Diff(a * ps.fd.Diff(f[0, 0], i), i)
for i in range(2))
expanded = ps.fd.expand_diff_full(factored_version, constants=[a])
reference = ps.fd.discretize_spatial(expanded, dx).factor()
to_test = ps.fd.discretize_spatial_staggered(factored_version, dx).factor()
assert reference == to_test
def test_staggered_combined():
from pystencils.fd import diff
f = ps.fields("f : double[2D]")
x, y = [LoopOverCoordinate.get_loop_counter_symbol(i) for i in range(2)]
dx = sp.symbols("dx")
expr = diff(x * diff(f, 0) + y * diff(f, 1), 0)
right = (x + sp.Rational(1, 2)) * (f[1, 0] - f[0, 0]) + y * (f[1, 1] - f[1, -1] + f[0, 1] - f[0, -1]) / 4
left = (x - sp.Rational(1, 2)) * (f[0, 0] - f[-1, 0]) + y * (f[-1, 1] - f[-1, -1] + f[0, 1] - f[0, -1]) / 4
reference = (right - left) / (dx ** 2)
to_test = ps.fd.discretize_spatial_staggered(expr, dx)
assert sp.expand(reference - to_test) == 0
def test_diffusion():
f = ps.fields("f(3): [2D]")
d = sp.Symbol("d")
dx = sp.Symbol("dx")
idx = 2
diffusion_term = diffusion(scalar=f, diffusion_coeff=sp.Symbol("d"), idx=idx)
discretization = Discretization2ndOrder()
expected_output = ((f[-1, 0](idx) + f[0, -1](idx) - 4 * f[0, 0](idx) + f[0, 1](idx) + f[1, 0](idx)) * d) / dx ** 2
assert sp.simplify(discretization(diffusion_term) - expected_output) == 0
with pytest.raises(ValueError):
diffusion(scalar=d, diffusion_coeff=sp.Symbol("d"), idx=idx)
# -*- coding: utf-8 -*-
#
# Copyright © 2019 Stephan Seitz <stephan.seitz@fau.de>
#
# Distributed under terms of the GPLv3 license.
"""
"""
import sympy as sp
import pystencils
from pystencils.typing import create_type
def test_floor_ceil_int_optimization():
x, y = pystencils.fields('x,y: int32[2d]')
a, b, c = sp.symbols('a, b, c')
int_symbol = sp.Symbol('int_symbol', integer=True)
typed_symbol = pystencils.TypedSymbol('typed_symbol', create_type('int64'))
assignments = pystencils.AssignmentCollection({
a: sp.floor(1),
b: sp.ceiling(typed_symbol),
c: sp.floor(int_symbol),
y.center(): sp.ceiling(x.center()) + sp.floor(x.center())
})
assert(typed_symbol.is_integer)
print(sp.simplify(sp.ceiling(typed_symbol)))
print(assignments)
wild_floor = sp.floor(sp.Wild('w1'))
assert not sp.floor(int_symbol).match(wild_floor)
assert sp.floor(a).match(wild_floor)
assert not assignments.find(wild_floor)
def test_floor_ceil_float_no_optimization():
x, y = pystencils.fields('x,y: float32[2d]')
a, b, c = sp.symbols('a, b, c')
int_symbol = sp.Symbol('int_symbol', integer=True)
typed_symbol = pystencils.TypedSymbol('typed_symbol', create_type('float32'))
assignments = pystencils.AssignmentCollection({
a: sp.floor(1),
b: sp.ceiling(typed_symbol),
c: sp.floor(int_symbol),
y.center(): sp.ceiling(x.center()) + sp.floor(x.center())
})
assert not typed_symbol.is_integer
print(sp.simplify(sp.ceiling(typed_symbol)))
print(assignments)
wild_floor = sp.floor(sp.Wild('w1'))
assert not sp.floor(int_symbol).match(wild_floor)
assert sp.floor(a).match(wild_floor)
assert assignments.find(wild_floor)
import sympy as sp
import pystencils as ps
import numpy as np
import pytest
from itertools import product
from pystencils.rng import random_symbol
from pystencils.astnodes import SympyAssignment
from pystencils.node_collection import NodeCollection
def advection_diffusion(dim: int):
# parameters
if dim == 2:
L = (32, 32)
elif dim == 3:
L = (16, 16, 16)
dh = ps.create_data_handling(domain_size=L, periodicity=True, default_target=ps.Target.CPU)
n_field = dh.add_array('n', values_per_cell=1)
j_field = dh.add_array('j', values_per_cell=3 ** dim // 2, field_type=ps.FieldType.STAGGERED_FLUX)
velocity_field = dh.add_array('v', values_per_cell=dim)
D = 0.0666
time = 100
def grad(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dim)])
flux_eq = - D * grad(n_field)
fvm_eq = ps.fd.FVM1stOrder(n_field, flux=flux_eq)
vof_adv = ps.fd.VOF(j_field, velocity_field, n_field)
# merge calculation of advection and diffusion terms
flux = []
for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_field)):
assert adv.lhs == div.lhs
flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs))
flux_kernel = ps.create_staggered_kernel(flux).compile()
pde_kernel = ps.create_kernel(fvm_eq.discrete_continuity(j_field)).compile()
sync_conc = dh.synchronization_function([n_field.name])
# analytical density calculation
def density(pos: np.ndarray, time: int, D: float):
return (4 * np.pi * D * time)**(-dim / 2) * \
np.exp(-np.sum(np.square(pos), axis=-1) / (4 * D * time))
pos = np.zeros((*L, dim))
xpos = np.arange(-L[0] // 2, L[0] // 2)
ypos = np.arange(-L[1] // 2, L[1] // 2)
if dim == 2:
pos[..., 1], pos[..., 0] = np.meshgrid(xpos, ypos)
elif dim == 3:
zpos = np.arange(-L[2] // 2, L[2] // 2)
pos[..., 2], pos[..., 1], pos[..., 0] = np.meshgrid(xpos, ypos, zpos)
pos += 0.5
def run(velocity: np.ndarray, time: int):
dh.fill(n_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(j_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
# set initial values for velocity and density
for i in range(dim):
dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True)
dh.fill(n_field.name, 0)
if dim == 2:
start = ps.make_slice[L[0] // 2 - 1:L[0] // 2 + 1, L[1] // 2 - 1:L[1] // 2 + 1]
else:
start = ps.make_slice[L[0] // 2 - 1:L[0] // 2 + 1, L[1] // 2 - 1:L[1] // 2 + 1,
L[2] // 2 - 1:L[2] // 2 + 1]
dh.fill(n_field.name, 2**-dim, slice_obj=start)
sync_conc()
for i in range(time):
dh.run_kernel(flux_kernel)
dh.run_kernel(pde_kernel)
sync_conc()
sim_density = dh.gather_array(n_field.name)
# check that mass was conserved
assert np.isclose(sim_density.sum(), 1)
assert np.all(sim_density > 0)
# check that the maximum is in the right place
peak = np.unravel_index(np.argmax(sim_density, axis=None), sim_density.shape)
assert np.allclose(peak, np.array(L) // 2 - 0.5 + velocity * time, atol=0.5)
# check the concentration profile
if np.linalg.norm(velocity) == 0:
calc_density = density(pos - velocity * time, time, D)
target = [time, D]
pytest.importorskip('scipy.optimize')
from scipy.optimize import curve_fit
popt, _ = curve_fit(lambda x, t, D: density(x - velocity * time, t, D),
pos.reshape(-1, dim),
sim_density.reshape(-1),
p0=target)
assert np.isclose(popt[0], time, rtol=0.1)
assert np.isclose(popt[1], D, rtol=0.1)
assert np.allclose(calc_density, sim_density, atol=1e-4)
return lambda v: run(np.array(v), time)
advection_diffusion.runners = {}
@pytest.mark.parametrize("velocity", list(product([0, -0.047, 0.041], [0, -0.031, 0.023])))
def test_advection_diffusion_2d(velocity):
if 2 not in advection_diffusion.runners:
advection_diffusion.runners[2] = advection_diffusion(2)
advection_diffusion.runners[2](velocity)
@pytest.mark.parametrize("velocity", list(product([0, -0.047, 0.041], [0, -0.031, 0.023], [0, -0.017, 0.011])))
@pytest.mark.longrun
def test_advection_diffusion_3d(velocity):
if 3 not in advection_diffusion.runners:
advection_diffusion.runners[3] = advection_diffusion(3)
advection_diffusion.runners[3](velocity)
def advection_diffusion_fluctuations(dim: int):
# parameters
if dim == 2:
L = (32, 32)
stencil_factor = np.sqrt(1 / (1 + np.sqrt(2)))
elif dim == 3:
L = (16, 16, 16)
stencil_factor = np.sqrt(1 / (1 + 2 * np.sqrt(2) + 4.0 / 3.0 * np.sqrt(3)))
dh = ps.create_data_handling(domain_size=L, periodicity=True, default_target=ps.Target.CPU)
n_field = dh.add_array('n', values_per_cell=1)
j_field = dh.add_array('j', values_per_cell=3 ** dim // 2, field_type=ps.FieldType.STAGGERED_FLUX)
velocity_field = dh.add_array('v', values_per_cell=dim)
D = 0.00666
time = 10000
def grad(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dim)])
flux_eq = - D * grad(n_field)
fvm_eq = ps.fd.FVM1stOrder(n_field, flux=flux_eq)
vof_adv = ps.fd.VOF(j_field, velocity_field, n_field)
# merge calculation of advection and diffusion terms
flux = []
for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_field)):
assert adv.lhs == div.lhs
flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs))
flux = ps.AssignmentCollection(flux)
rng_symbol_gen = random_symbol(flux.subexpressions, dim=dh.dim)
for i in range(len(flux.main_assignments)):
n = j_field.staggered_stencil[i]
assert flux.main_assignments[i].lhs == j_field.staggered_access(n)
# calculate mean density
dens = (n_field.neighbor_vector(n) + n_field.center_vector)[0] / 2
# multyply by smoothed haviside function so that fluctuation will not get bigger that the density
dens *= sp.Max(0, sp.Min(1.0, n_field.neighbor_vector(n)[0]) * sp.Min(1.0, n_field.center_vector[0]))
# lenght of the vector
length = sp.sqrt(len(j_field.staggered_stencil[i]))
# amplitude of the random fluctuations
fluct = sp.sqrt(2 * dens * D) * sp.sqrt(1 / length) * stencil_factor
# add fluctuations
fluct *= 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3)
flux.main_assignments[i] = ps.Assignment(flux.main_assignments[i].lhs, flux.main_assignments[i].rhs + fluct)
# Add the folding to the flux, so that the random numbers persist through the ghostlayers.
fold = {ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i):
ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i) % L[i] for i in range(len(L))}
flux.subs(fold)
flux_kernel = ps.create_staggered_kernel(flux).compile()
pde_kernel = ps.create_kernel(fvm_eq.discrete_continuity(j_field)).compile()
sync_conc = dh.synchronization_function([n_field.name])
# analytical density distribution calculation
def P(rho, density_init):
res = []
for r in rho:
res.append(np.power(density_init, r) * np.exp(-density_init) / np.math.gamma(r + 1))
return np.array(res)
def run(density_init: float, velocity: np.ndarray, time: int):
dh.fill(n_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(j_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
# set initial values for velocity and density
for i in range(dim):
dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True)
dh.fill(n_field.name, density_init)
measurement_intervall = 10
warm_up = 1000
data = []
sync_conc()
for i in range(warm_up):
dh.run_kernel(flux_kernel, seed=42, time_step=i)
dh.run_kernel(pde_kernel)
sync_conc()
for i in range(time):
dh.run_kernel(flux_kernel, seed=42, time_step=i + warm_up)
dh.run_kernel(pde_kernel)
sync_conc()
if(i % measurement_intervall == 0):
data = np.append(data, dh.gather_array(n_field.name).ravel(), 0)
# test mass conservation
np.testing.assert_almost_equal(dh.gather_array(n_field.name).mean(), density_init)
n_bins = 50
density_value, bins = np.histogram(data, density=True, bins=n_bins)
bins_mean = bins[:-1] + (bins[1:] - bins[:-1]) / 2
analytical_value = P(bins_mean, density_init)
print(density_value - analytical_value)
np.testing.assert_allclose(density_value, analytical_value, atol=2e-3)
return lambda density_init, v: run(density_init, np.array(v), time)
advection_diffusion_fluctuations.runners = {}
@pytest.mark.parametrize("velocity", list(product([0, 0.00041], [0, -0.00031])))
@pytest.mark.parametrize("density", [27.0, 56.5])
@pytest.mark.longrun
def test_advection_diffusion_fluctuation_2d(density, velocity):
if 2 not in advection_diffusion_fluctuations.runners:
advection_diffusion_fluctuations.runners[2] = advection_diffusion_fluctuations(2)
advection_diffusion_fluctuations.runners[2](density, velocity)
@pytest.mark.parametrize("velocity", [(0.0, 0.0, 0.0), (0.00043, -0.00017, 0.00028)])
@pytest.mark.parametrize("density", [27.0, 56.5])
@pytest.mark.longrun
def test_advection_diffusion_fluctuation_3d(density, velocity):
if 3 not in advection_diffusion_fluctuations.runners:
advection_diffusion_fluctuations.runners[3] = advection_diffusion_fluctuations(3)
advection_diffusion_fluctuations.runners[3](density, velocity)
def diffusion_reaction(fluctuations: bool):
# parameters
L = (32, 32)
stencil_factor = np.sqrt(1 / (1 + np.sqrt(2)))
dh = ps.create_data_handling(domain_size=L, periodicity=True, default_target=ps.Target.CPU)
species = 2
n_fields = []
j_fields = []
r_flux_fields = []
for i in range(species):
n_fields.append(dh.add_array(f'n_{i}', values_per_cell=1))
j_fields.append(dh.add_array(f'j_{i}', values_per_cell=3 ** dh.dim // 2,
field_type=ps.FieldType.STAGGERED_FLUX))
r_flux_fields.append(dh.add_array(f'r_{i}', values_per_cell=1))
velocity_field = dh.add_array('v', values_per_cell=dh.dim)
D = 0.00666
time = 1000
r_order = [2.0, 0.0]
r_rate_const = 0.00001
r_coefs = [-2, 1]
def grad(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dh.dim)])
flux_eq = - D * grad(n_fields[0])
fvm_eq = ps.fd.FVM1stOrder(n_fields[0], flux=flux_eq)
vof_adv = ps.fd.VOF(j_fields[0], velocity_field, n_fields[0])
continuity_assignments = fvm_eq.discrete_continuity(j_fields[0])
# merge calculation of advection and diffusion terms
flux = []
for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_fields[0])):
assert adv.lhs == div.lhs
flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs))
flux = ps.AssignmentCollection(flux)
if(fluctuations):
rng_symbol_gen = random_symbol(flux.subexpressions, dim=dh.dim)
for i in range(len(flux.main_assignments)):
n = j_fields[0].staggered_stencil[i]
assert flux.main_assignments[i].lhs == j_fields[0].staggered_access(n)
# calculate mean density
dens = (n_fields[0].neighbor_vector(n) + n_fields[0].center_vector)[0] / 2
# multyply by smoothed haviside function so that fluctuation will not get bigger that the density
dens *= sp.Max(0,
sp.Min(1.0, n_fields[0].neighbor_vector(n)[0]) * sp.Min(1.0, n_fields[0].center_vector[0]))
# lenght of the vector
length = sp.sqrt(len(j_fields[0].staggered_stencil[i]))
# amplitude of the random fluctuations
fluct = sp.sqrt(2 * dens * D) * sp.sqrt(1 / length) * stencil_factor
# add fluctuations
fluct *= 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3)
flux.main_assignments[i] = ps.Assignment(flux.main_assignments[i].lhs, flux.main_assignments[i].rhs + fluct)
# Add the folding to the flux, so that the random numbers persist through the ghostlayers.
fold = {ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i):
ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i) % L[i] for i in range(len(L))}
flux.subs(fold)
r_flux = NodeCollection([SympyAssignment(j_fields[i].center, 0) for i in range(species)])
reaction = r_rate_const
for i in range(species):
reaction *= sp.Pow(n_fields[i].center, r_order[i])
new_assignments = []
if fluctuations:
rng_symbol_gen = random_symbol(new_assignments, dim=dh.dim)
reaction_fluctuations = sp.sqrt(sp.Abs(reaction)) * 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3)
reaction_fluctuations *= sp.Min(1, sp.Abs(reaction**2))
else:
reaction_fluctuations = 0.0
for i in range(species):
r_flux.all_assignments[i] = SympyAssignment(
r_flux_fields[i].center, (reaction + reaction_fluctuations) * r_coefs[i])
[r_flux.all_assignments.insert(0, new) for new in new_assignments]
continuity_assignments = [SympyAssignment(*assignment.args) for assignment in continuity_assignments]
continuity_assignments.append(SympyAssignment(n_fields[0].center, n_fields[0].center + r_flux_fields[0].center))
flux_kernel = ps.create_staggered_kernel(flux).compile()
reaction_kernel = ps.create_kernel(r_flux).compile()
config = ps.CreateKernelConfig(allow_double_writes=True)
pde_kernel = ps.create_kernel(continuity_assignments, config=config).compile()
sync_conc = dh.synchronization_function([n_fields[0].name, n_fields[1].name])
def f(t, r, n0, fac, fluctuations):
"""Calculates the amount of product created after a certain time of a reaction with form xA -> B
Args:
t: Time of the reation
r: Reaction rate constant
n0: Initial density of the
fac: Reaction order of A (this in most cases equals the stochometric coefficient x)
fluctuations: Boolian whether fluctuations were included during the reaction.
"""
if fluctuations:
return 1 / fac * (n0 + n0 / (n0 - (n0 + 1) * np.exp(fac * r * t)))
return 1 / fac * (n0 - (1 / (fac * r * t + (1 / n0))))
def run(density_init: float, velocity: np.ndarray, time: int):
for i in range(species):
dh.fill(n_fields[i].name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(j_fields[i].name, 0.0, ghost_layers=True, inner_ghost_layers=True)
dh.fill(r_flux_fields[i].name, 0.0, ghost_layers=True, inner_ghost_layers=True)
# set initial values for velocity and density
for i in range(dh.dim):
dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True)
dh.fill(n_fields[0].name, density_init)
dh.fill(n_fields[1].name, 0.0)
measurement_intervall = 10
data = []
sync_conc()
for i in range(time):
if(i % measurement_intervall == 0):
data.append([i, dh.gather_array(n_fields[1].name).mean(), dh.gather_array(n_fields[0].name).mean()])
dh.run_kernel(reaction_kernel, seed=41, time_step=i)
for s_idx in range(species):
flux_kernel(n_0=dh.cpu_arrays[n_fields[s_idx].name],
j_0=dh.cpu_arrays[j_fields[s_idx].name],
v=dh.cpu_arrays[velocity_field.name], seed=42 + s_idx, time_step=i)
pde_kernel(n_0=dh.cpu_arrays[n_fields[s_idx].name],
j_0=dh.cpu_arrays[j_fields[s_idx].name],
r_0=dh.cpu_arrays[r_flux_fields[s_idx].name])
sync_conc()
data = np.array(data).transpose()
x = data[0]
analytical_value = f(x, r_rate_const, density_init, abs(r_coefs[0]), fluctuations)
# test mass conservation
np.testing.assert_almost_equal(
dh.gather_array(n_fields[0].name).mean() + 2 * dh.gather_array(n_fields[1].name).mean(), density_init)
r_tol = 2e-3
if fluctuations:
r_tol = 3e-2
np.testing.assert_allclose(data[1], analytical_value, rtol=r_tol)
return lambda density_init, v: run(density_init, np.array(v), time)
advection_diffusion_fluctuations.runners = {}
@pytest.mark.parametrize("velocity", list(product([0, 0.0041], [0, -0.0031])))
@pytest.mark.parametrize("density", [27.0, 56.5])
@pytest.mark.parametrize("fluctuations", [False, True])
@pytest.mark.longrun
def test_diffusion_reaction(fluctuations, density, velocity):
diffusion_reaction.runner = diffusion_reaction(fluctuations)
diffusion_reaction.runner(density, velocity)
def VOF2(j: ps.field.Field, v: ps.field.Field, ρ: ps.field.Field, simplify=True):
"""Volume-of-fluid discretization of advection
Args:
j: the staggered field to write the fluxes to. Should have a D2Q9/D3Q27 stencil. Other stencils work too, but
incur a small error (D2Q5/D3Q7: v^2, D3Q19: v^3).
v: the flow velocity field
ρ: the quantity to advect
simplify: whether to simplify the generated expressions (slow, but makes them much more readable and faster)
"""
dim = j.spatial_dimensions
assert ps.FieldType.is_staggered(j)
def assume_velocity(e):
if not simplify:
return e
repl = {}
for c in e.atoms(sp.StrictGreaterThan, sp.GreaterThan):
if isinstance(c.lhs, ps.field.Field.Access) and c.lhs.field == v and isinstance(c.rhs, sp.Number):
if c.rhs <= -1:
repl[c] = True
elif c.rhs >= 1:
repl[c] = False
for c in e.atoms(sp.StrictLessThan, sp.LessThan):
if isinstance(c.lhs, ps.field.Field.Access) and c.lhs.field == v and isinstance(c.rhs, sp.Number):
if c.rhs >= 1:
repl[c] = True
elif c.rhs <= -1:
repl[c] = False
for c in e.atoms(sp.Equality):
if isinstance(c.lhs, ps.field.Field.Access) and c.lhs.field == v and isinstance(c.rhs, sp.Number):
if c.rhs <= -1 or c.rhs >= 1:
repl[c] = False
return e.subs(repl)
class AABB:
def __init__(self, corner0, corner1):
self.dim = len(corner0)
self.minCorner = sp.zeros(self.dim, 1)
self.maxCorner = sp.zeros(self.dim, 1)
for i in range(self.dim):
self.minCorner[i] = sp.Piecewise((corner0[i], corner0[i] < corner1[i]), (corner1[i], True))
self.maxCorner[i] = sp.Piecewise((corner1[i], corner0[i] < corner1[i]), (corner0[i], True))
def intersect(self, other):
minCorner = [sp.Max(self.minCorner[d], other.minCorner[d]) for d in range(self.dim)]
maxCorner = [sp.Max(minCorner[d], sp.Min(self.maxCorner[d], other.maxCorner[d]))
for d in range(self.dim)]
return AABB(minCorner, maxCorner)
@property
def volume(self):
v = sp.prod([self.maxCorner[d] - self.minCorner[d] for d in range(self.dim)])
if simplify:
return sp.simplify(assume_velocity(v.rewrite(sp.Piecewise)))
else:
return v
fluxes = []
cell = AABB([-0.5] * dim, [0.5] * dim)
cell_s = AABB(sp.Matrix([-0.5] * dim) + v.center_vector, sp.Matrix([0.5] * dim) + v.center_vector)
for d, neighbor in enumerate(j.staggered_stencil):
c = sp.Matrix(ps.stencil.direction_string_to_offset(neighbor)[:dim])
cell_n = AABB(sp.Matrix([-0.5] * dim) + c, sp.Matrix([0.5] * dim) + c)
cell_ns = AABB(sp.Matrix([-0.5] * dim) + c + v.neighbor_vector(neighbor),
sp.Matrix([0.5] * dim) + c + v.neighbor_vector(neighbor))
fluxes.append(assume_velocity(ρ.center_vector * cell_s.intersect(cell_n).volume
- ρ.neighbor_vector(neighbor) * cell_ns.intersect(cell).volume))
assignments = []
for i, d in enumerate(j.staggered_stencil):
for lhs, rhs in zip(j.staggered_vector_access(d).values(), fluxes[i].values()):
assignments.append(ps.Assignment(lhs, rhs))
return assignments
@pytest.mark.parametrize("dim", [2, 3])
def test_advection(dim):
L = (8,) * dim
dh = ps.create_data_handling(L, periodicity=True, default_target=ps.Target.CPU)
c = dh.add_array('c', values_per_cell=1)
j = dh.add_array('j', values_per_cell=3 ** dh.dim // 2, field_type=ps.FieldType.STAGGERED_FLUX)
u = dh.add_array('u', values_per_cell=dh.dim)
dh.cpu_arrays[c.name][:] = (np.random.random([l + 2 for l in L]))
dh.cpu_arrays[u.name][:] = (np.random.random([l + 2 for l in L] + [dim]) - 0.5) / 5
vof1 = ps.create_kernel(ps.fd.VOF(j, u, c)).compile()
dh.fill(j.name, np.nan, ghost_layers=True)
dh.run_kernel(vof1)
j1 = dh.gather_array(j.name).copy()
vof2 = ps.create_kernel(VOF2(j, u, c, simplify=False)).compile()
dh.fill(j.name, np.nan, ghost_layers=True)
dh.run_kernel(vof2)
j2 = dh.gather_array(j.name)
assert np.allclose(j1, j2)
@pytest.mark.parametrize("stencil", ["D2Q5", "D2Q9"])
def test_ek(stencil):
# parameters
L = (40, 40)
D = sp.Symbol("D")
z = sp.Symbol("z")
# data structures
dh = ps.create_data_handling(L, periodicity=True, default_target=ps.Target.CPU)
c = dh.add_array('c', values_per_cell=1)
j = dh.add_array('j', values_per_cell=int(stencil[-1]) // 2, field_type=ps.FieldType.STAGGERED_FLUX)
Phi = dh.add_array('Φ', values_per_cell=1)
# perform automatic discretization
def Gradient(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dh.dim)])
flux_eq = -D * Gradient(c) + D * z * c.center * Gradient(Phi)
disc = ps.fd.FVM1stOrder(c, flux_eq)
flux_assignments = disc.discrete_flux(j)
continuity_assignments = disc.discrete_continuity(j)
# manual discretization
x_staggered = - c[-1, 0] + c[0, 0] + z * (c[-1, 0] + c[0, 0]) / 2 * (Phi[-1, 0] - Phi[0, 0])
y_staggered = - c[0, -1] + c[0, 0] + z * (c[0, -1] + c[0, 0]) / 2 * (Phi[0, -1] - Phi[0, 0])
xy_staggered = (- c[-1, -1] + c[0, 0]) / sp.sqrt(2) + \
z * (c[-1, -1] + c[0, 0]) / 2 * (Phi[-1, -1] - Phi[0, 0]) / sp.sqrt(2)
xY_staggered = (- c[-1, 1] + c[0, 0]) / sp.sqrt(2) + \
z * (c[-1, 1] + c[0, 0]) / 2 * (Phi[-1, 1] - Phi[0, 0]) / sp.sqrt(2)
A0 = (1 + sp.sqrt(2) if j.index_shape[0] == 4 else 1)
jj = j.staggered_access
divergence = -1 * sum([jj(d) for d in j.staggered_stencil
+ [ps.stencil.inverse_direction_string(d) for d in j.staggered_stencil]])
update = [ps.Assignment(c.center, c.center + divergence)]
flux = [ps.Assignment(j.staggered_access("W"), D * x_staggered / A0),
ps.Assignment(j.staggered_access("S"), D * y_staggered / A0)]
if j.index_shape[0] == 4:
flux += [ps.Assignment(j.staggered_access("SW"), D * xy_staggered / A0),
ps.Assignment(j.staggered_access("NW"), D * xY_staggered / A0)]
# compare
for a, b in zip(flux, flux_assignments):
assert a.lhs == b.lhs
assert sp.simplify(a.rhs - b.rhs) == 0
for a, b in zip(update, continuity_assignments):
assert a.lhs == b.lhs
assert a.rhs == b.rhs
# TODO: test source
@pytest.mark.parametrize("stencil", ["D2Q5", "D2Q9", "D3Q7", "D3Q19", "D3Q27"])
@pytest.mark.parametrize("derivative", [0, 1])
def test_flux_stencil(stencil, derivative):
L = (40, ) * int(stencil[1])
dh = ps.create_data_handling(L, periodicity=True, default_target=ps.Target.CPU)
c = dh.add_array('c', values_per_cell=1)
j = dh.add_array('j', values_per_cell=int(stencil[3:]) // 2, field_type=ps.FieldType.STAGGERED_FLUX)
def Gradient(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dh.dim)])
eq = [sp.Matrix([sp.Symbol(f"a_{i}") * c.center for i in range(dh.dim)]), Gradient(c)][derivative]
disc = ps.fd.FVM1stOrder(c, flux=eq)
# check the continuity
continuity_assignments = disc.discrete_continuity(j)
assert [len(a.rhs.atoms(ps.field.Field.Access)) for a in continuity_assignments] == \
[int(stencil[3:])] * len(continuity_assignments)
# check the flux
flux_assignments = disc.discrete_flux(j)
assert [len(a.rhs.atoms(ps.field.Field.Access)) for a in flux_assignments] == [2] * len(flux_assignments)
@pytest.mark.parametrize("stencil", ["D2Q5", "D2Q9", "D3Q7", "D3Q19", "D3Q27"])
def test_source_stencil(stencil):
L = (40, ) * int(stencil[1])
dh = ps.create_data_handling(L, periodicity=True, default_target=ps.Target.CPU)
c = dh.add_array('c', values_per_cell=1)
j = dh.add_array('j', values_per_cell=int(stencil[3:]) // 2, field_type=ps.FieldType.STAGGERED_FLUX)
continuity_ref = ps.fd.FVM1stOrder(c).discrete_continuity(j)
for eq in [c.center] + [ps.fd.diff(c, i) for i in range(dh.dim)]:
disc = ps.fd.FVM1stOrder(c, source=eq)
diff = sp.simplify(disc.discrete_continuity(j)[0].rhs - continuity_ref[0].rhs)
if type(eq) is ps.field.Field.Access:
assert len(diff.atoms(ps.field.Field.Access)) == 1
else:
assert len(diff.atoms(ps.field.Field.Access)) == 2
def test_fvm_staggered_simplification():
D = sp.Symbol("D")
data_type = "float64"
c = ps.fields(f"c: {data_type}[2D]", layout='fzyx')
j = ps.fields(f"j(2): {data_type}[2D]", layout='fzyx', field_type=ps.FieldType.STAGGERED_FLUX)
grad_c = sp.Matrix([ps.fd.diff(c, i) for i in range(c.spatial_dimensions)])
ek = ps.fd.FVM1stOrder(c, flux=-D * grad_c)
ast = ps.create_staggered_kernel(ek.discrete_flux(j))
code = ps.get_code_str(ast)
assert '_size_c_0 - 1 < _size_c_0 - 1' not in code
import sympy
import pystencils.astnodes
from pystencils.backends.cbackend import CBackend
from pystencils.typing import TypedSymbol
class BogusDeclaration(pystencils.astnodes.Node):
"""Base class for all AST nodes."""
def __init__(self, parent=None):
self.parent = parent
@property
def args(self):
"""Returns all arguments/children of this node."""
return set()
@property
def symbols_defined(self):
"""Set of symbols which are defined by this node."""
return {TypedSymbol('Foo', 'double')}
@property
def undefined_symbols(self):
"""Symbols which are used but are not defined inside this node."""
set()
def subs(self, subs_dict):
"""Inplace! substitute, similar to sympy's but modifies the AST inplace."""
for a in self.args:
a.subs(subs_dict)
@property
def func(self):
return self.__class__
def atoms(self, arg_type):
"""Returns a set of all descendants recursively, which are an instance of the given type."""
result = set()
for arg in self.args:
if isinstance(arg, arg_type):
result.add(arg)
result.update(arg.atoms(arg_type))
return result
class BogusUsage(pystencils.astnodes.Node):
"""Base class for all AST nodes."""
def __init__(self, requires_global: bool, parent=None):
self.parent = parent
if requires_global:
self.required_global_declarations = [BogusDeclaration()]
@property
def args(self):
"""Returns all arguments/children of this node."""
return set()
@property
def symbols_defined(self):
"""Set of symbols which are defined by this node."""
return set()
@property
def undefined_symbols(self):
"""Symbols which are used but are not defined inside this node."""
return {TypedSymbol('Foo', 'double')}
def subs(self, subs_dict):
"""Inplace! substitute, similar to sympy's but modifies the AST inplace."""
for a in self.args:
a.subs(subs_dict)
@property
def func(self):
return self.__class__
def atoms(self, arg_type):
"""Returns a set of all descendants recursively, which are an instance of the given type."""
result = set()
for arg in self.args:
if isinstance(arg, arg_type):
result.add(arg)
result.update(arg.atoms(arg_type))
return result
def test_global_definitions_with_global_symbol():
# Teach our printer to print new ast nodes
CBackend._print_BogusUsage = lambda _, __: "// Bogus would go here"
CBackend._print_BogusDeclaration = lambda _, __: "// Declaration would go here"
z, x, y = pystencils.fields("z, y, x: [2d]")
normal_assignments = pystencils.AssignmentCollection([pystencils.Assignment(
z[0, 0], x[0, 0] * x[0, 0] * y[0, 0])], [])
ast = pystencils.create_kernel(normal_assignments)
print(pystencils.show_code(ast))
ast.body.append(BogusUsage(requires_global=True))
print(pystencils.show_code(ast))
kernel = ast.compile()
assert kernel is not None
assert TypedSymbol('Foo', 'double') not in [p.symbol for p in ast.get_parameters()]
def test_global_definitions_without_global_symbol():
# Teach our printer to print new ast nodes
CBackend._print_BogusUsage = lambda _, __: "// Bogus would go here"
CBackend._print_BogusDeclaration = lambda _, __: "// Declaration would go here"
z, x, y = pystencils.fields("z, y, x: [2d]")
normal_assignments = pystencils.AssignmentCollection([pystencils.Assignment(
z[0, 0], x[0, 0] * x[0, 0] * y[0, 0])], [])
ast = pystencils.create_kernel(normal_assignments)
print(pystencils.show_code(ast))
ast.body.append(BogusUsage(requires_global=False))
print(pystencils.show_code(ast))
kernel = ast.compile()
assert kernel is not None
assert TypedSymbol('Foo', 'double') in [p.symbol for p in ast.get_parameters()]
import pytest
import numpy as np import numpy as np
import sympy as sp import sympy as sp
from pystencils import Field, Assignment import math
from pystencils.simp import sympy_cse_on_assignment_list
from pystencils.gpucuda.indexing import LineIndexing
from pystencils.slicing import remove_ghost_layers, add_ghost_layers, make_slice
from pystencils.gpucuda import make_python_function, create_cuda_kernel
import pycuda.gpuarray as gpuarray
from scipy.ndimage import convolve from scipy.ndimage import convolve
from pystencils import Assignment, Field, fields, CreateKernelConfig, create_kernel, Target, get_code_str
from pystencils.gpu import BlockIndexing
from pystencils.simp import sympy_cse_on_assignment_list
from pystencils.slicing import add_ghost_layers, make_slice, remove_ghost_layers, normalize_slice
try:
import cupy as cp
device_numbers = range(cp.cuda.runtime.getDeviceCount())
except ImportError:
device_numbers = []
cp = None
def test_averaging_kernel(): def test_averaging_kernel():
pytest.importorskip('cupy')
size = (40, 55) size = (40, 55)
src_arr = np.random.rand(*size) src_arr = np.random.rand(*size)
src_arr = add_ghost_layers(src_arr) src_arr = add_ghost_layers(src_arr)
...@@ -20,13 +30,14 @@ def test_averaging_kernel(): ...@@ -20,13 +30,14 @@ def test_averaging_kernel():
update_rule = Assignment(dst_field[0, 0], update_rule = Assignment(dst_field[0, 0],
(src_field[0, 1] + src_field[0, -1] + src_field[1, 0] + src_field[-1, 0]) / 4) (src_field[0, 1] + src_field[0, -1] + src_field[1, 0] + src_field[-1, 0]) / 4)
ast = create_cuda_kernel(sympy_cse_on_assignment_list([update_rule])) config = CreateKernelConfig(target=Target.GPU)
kernel = make_python_function(ast) ast = create_kernel(sympy_cse_on_assignment_list([update_rule]), config=config)
kernel = ast.compile()
gpu_src_arr = gpuarray.to_gpu(src_arr) gpu_src_arr = cp.asarray(src_arr)
gpu_dst_arr = gpuarray.to_gpu(dst_arr) gpu_dst_arr = cp.asarray(dst_arr)
kernel(src=gpu_src_arr, dst=gpu_dst_arr) kernel(src=gpu_src_arr, dst=gpu_dst_arr)
gpu_dst_arr.get(dst_arr) dst_arr = gpu_dst_arr.get()
stencil = np.array([[0, 1, 0], [1, 0, 1], [0, 1, 0]]) / 4.0 stencil = np.array([[0, 1, 0], [1, 0, 1], [0, 1, 0]]) / 4.0
reference = convolve(remove_ghost_layers(src_arr), stencil, mode='constant', cval=0.0) reference = convolve(remove_ghost_layers(src_arr), stencil, mode='constant', cval=0.0)
...@@ -35,24 +46,26 @@ def test_averaging_kernel(): ...@@ -35,24 +46,26 @@ def test_averaging_kernel():
def test_variable_sized_fields(): def test_variable_sized_fields():
pytest.importorskip('cupy')
src_field = Field.create_generic('src', spatial_dimensions=2) src_field = Field.create_generic('src', spatial_dimensions=2)
dst_field = Field.create_generic('dst', spatial_dimensions=2) dst_field = Field.create_generic('dst', spatial_dimensions=2)
update_rule = Assignment(dst_field[0, 0], update_rule = Assignment(dst_field[0, 0],
(src_field[0, 1] + src_field[0, -1] + src_field[1, 0] + src_field[-1, 0]) / 4) (src_field[0, 1] + src_field[0, -1] + src_field[1, 0] + src_field[-1, 0]) / 4)
ast = create_cuda_kernel(sympy_cse_on_assignment_list([update_rule])) config = CreateKernelConfig(target=Target.GPU)
kernel = make_python_function(ast) ast = create_kernel(sympy_cse_on_assignment_list([update_rule]), config=config)
kernel = ast.compile()
size = (3, 3) size = (3, 3)
src_arr = np.random.rand(*size) src_arr = np.random.rand(*size)
src_arr = add_ghost_layers(src_arr) src_arr = add_ghost_layers(src_arr)
dst_arr = np.zeros_like(src_arr) dst_arr = np.zeros_like(src_arr)
gpu_src_arr = gpuarray.to_gpu(src_arr) gpu_src_arr = cp.asarray(src_arr)
gpu_dst_arr = gpuarray.to_gpu(dst_arr) gpu_dst_arr = cp.asarray(dst_arr)
kernel(src=gpu_src_arr, dst=gpu_dst_arr) kernel(src=gpu_src_arr, dst=gpu_dst_arr)
gpu_dst_arr.get(dst_arr) dst_arr = gpu_dst_arr.get()
stencil = np.array([[0, 1, 0], [1, 0, 1], [0, 1, 0]]) / 4.0 stencil = np.array([[0, 1, 0], [1, 0, 1], [0, 1, 0]]) / 4.0
reference = convolve(remove_ghost_layers(src_arr), stencil, mode='constant', cval=0.0) reference = convolve(remove_ghost_layers(src_arr), stencil, mode='constant', cval=0.0)
...@@ -61,6 +74,7 @@ def test_variable_sized_fields(): ...@@ -61,6 +74,7 @@ def test_variable_sized_fields():
def test_multiple_index_dimensions(): def test_multiple_index_dimensions():
pytest.importorskip('cupy')
"""Sums along the last axis of a numpy array""" """Sums along the last axis of a numpy array"""
src_size = (7, 6, 4) src_size = (7, 6, 4)
dst_size = src_size[:2] dst_size = src_size[:2]
...@@ -74,13 +88,14 @@ def test_multiple_index_dimensions(): ...@@ -74,13 +88,14 @@ def test_multiple_index_dimensions():
update_rule = Assignment(dst_field[0, 0], update_rule = Assignment(dst_field[0, 0],
sum([src_field[offset[0], offset[1]](i) for i in range(src_size[-1])])) sum([src_field[offset[0], offset[1]](i) for i in range(src_size[-1])]))
ast = create_cuda_kernel([update_rule]) config = CreateKernelConfig(target=Target.GPU)
kernel = make_python_function(ast) ast = create_kernel([update_rule], config=config)
kernel = ast.compile()
gpu_src_arr = gpuarray.to_gpu(src_arr) gpu_src_arr = cp.asarray(src_arr)
gpu_dst_arr = gpuarray.to_gpu(dst_arr) gpu_dst_arr = cp.asarray(dst_arr)
kernel(src=gpu_src_arr, dst=gpu_dst_arr) kernel(src=gpu_src_arr, dst=gpu_dst_arr)
gpu_dst_arr.get(dst_arr) dst_arr = gpu_dst_arr.get()
reference = np.zeros_like(dst_arr) reference = np.zeros_like(dst_arr)
gl = np.max(np.abs(np.array(offset, dtype=int))) gl = np.max(np.abs(np.array(offset, dtype=int)))
...@@ -92,6 +107,7 @@ def test_multiple_index_dimensions(): ...@@ -92,6 +107,7 @@ def test_multiple_index_dimensions():
def test_ghost_layer(): def test_ghost_layer():
pytest.importorskip('cupy')
size = (6, 5) size = (6, 5)
src_arr = np.ones(size) src_arr = np.ones(size)
dst_arr = np.zeros_like(src_arr) dst_arr = np.zeros_like(src_arr)
...@@ -100,13 +116,15 @@ def test_ghost_layer(): ...@@ -100,13 +116,15 @@ def test_ghost_layer():
update_rule = Assignment(dst_field[0, 0], src_field[0, 0]) update_rule = Assignment(dst_field[0, 0], src_field[0, 0])
ghost_layers = [(1, 2), (2, 1)] ghost_layers = [(1, 2), (2, 1)]
ast = create_cuda_kernel([update_rule], ghost_layers=ghost_layers, indexing_creator=LineIndexing)
kernel = make_python_function(ast)
gpu_src_arr = gpuarray.to_gpu(src_arr) config = CreateKernelConfig(target=Target.GPU, ghost_layers=ghost_layers, gpu_indexing="line")
gpu_dst_arr = gpuarray.to_gpu(dst_arr) ast = create_kernel(sympy_cse_on_assignment_list([update_rule]), config=config)
kernel = ast.compile()
gpu_src_arr = cp.asarray(src_arr)
gpu_dst_arr = cp.asarray(dst_arr)
kernel(src=gpu_src_arr, dst=gpu_dst_arr) kernel(src=gpu_src_arr, dst=gpu_dst_arr)
gpu_dst_arr.get(dst_arr) dst_arr = gpu_dst_arr.get()
reference = np.zeros_like(src_arr) reference = np.zeros_like(src_arr)
reference[ghost_layers[0][0]:-ghost_layers[0][1], ghost_layers[1][0]:-ghost_layers[1][1]] = 1 reference[ghost_layers[0][0]:-ghost_layers[0][1], ghost_layers[1][0]:-ghost_layers[1][1]] = 1
...@@ -114,25 +132,29 @@ def test_ghost_layer(): ...@@ -114,25 +132,29 @@ def test_ghost_layer():
def test_setting_value(): def test_setting_value():
pytest.importorskip('cupy')
arr_cpu = np.arange(25, dtype=np.float64).reshape(5, 5) arr_cpu = np.arange(25, dtype=np.float64).reshape(5, 5)
arr_gpu = gpuarray.to_gpu(arr_cpu) arr_gpu = cp.asarray(arr_cpu)
iteration_slice = make_slice[:, :] iteration_slice = make_slice[:, :]
f = Field.create_generic("f", 2) f = Field.create_generic("f", 2)
update_rule = [Assignment(f(0), sp.Symbol("value"))] update_rule = [Assignment(f(0), sp.Symbol("value"))]
ast = create_cuda_kernel(update_rule, iteration_slice=iteration_slice, indexing_creator=LineIndexing)
kernel = make_python_function(ast) config = CreateKernelConfig(target=Target.GPU, gpu_indexing="line", iteration_slice=iteration_slice)
ast = create_kernel(sympy_cse_on_assignment_list(update_rule), config=config)
kernel = ast.compile()
kernel(f=arr_gpu, value=np.float64(42.0)) kernel(f=arr_gpu, value=np.float64(42.0))
np.testing.assert_equal(arr_gpu.get(), np.ones((5, 5)) * 42.0) np.testing.assert_equal(arr_gpu.get(), np.ones((5, 5)) * 42.0)
def test_periodicity(): def test_periodicity():
from pystencils.gpucuda.periodicity import get_periodic_boundary_functor as periodic_gpu pytest.importorskip('cupy')
from pystencils.gpu.periodicity import get_periodic_boundary_functor as periodic_gpu
from pystencils.slicing import get_periodic_boundary_functor as periodic_cpu from pystencils.slicing import get_periodic_boundary_functor as periodic_cpu
arr_cpu = np.arange(50, dtype=np.float64).reshape(5, 5, 2) arr_cpu = np.arange(50, dtype=np.float64).reshape(5, 5, 2)
arr_gpu = gpuarray.to_gpu(arr_cpu) arr_gpu = cp.asarray(arr_cpu)
periodicity_stencil = [(1, 0), (-1, 0), (1, 1)] periodicity_stencil = [(1, 0), (-1, 0), (1, 1)]
periodic_gpu_kernel = periodic_gpu(periodicity_stencil, (5, 5), 1, 2) periodic_gpu_kernel = periodic_gpu(periodicity_stencil, (5, 5), 1, 2)
...@@ -141,7 +163,95 @@ def test_periodicity(): ...@@ -141,7 +163,95 @@ def test_periodicity():
cpu_result = np.copy(arr_cpu) cpu_result = np.copy(arr_cpu)
periodic_cpu_kernel(cpu_result) periodic_cpu_kernel(cpu_result)
gpu_result = np.copy(arr_cpu)
periodic_gpu_kernel(pdfs=arr_gpu) periodic_gpu_kernel(pdfs=arr_gpu)
arr_gpu.get(gpu_result) gpu_result = arr_gpu.get()
np.testing.assert_equal(cpu_result, gpu_result) np.testing.assert_equal(cpu_result, gpu_result)
@pytest.mark.parametrize("device_number", device_numbers)
def test_block_indexing(device_number):
pytest.importorskip('cupy')
f = fields("f: [3D]")
s = normalize_slice(make_slice[:, :, :], f.spatial_shape)
bi = BlockIndexing(s, f.layout, block_size=(16, 8, 2),
permute_block_size_dependent_on_layout=False)
assert bi.call_parameters((3, 2, 32))['block'] == (3, 2, 32)
assert bi.call_parameters((32, 2, 32))['block'] == (16, 2, 8)
bi = BlockIndexing(s, f.layout, block_size=(32, 1, 1),
permute_block_size_dependent_on_layout=False)
assert bi.call_parameters((1, 16, 16))['block'] == (1, 16, 2)
bi = BlockIndexing(s, f.layout, block_size=(16, 8, 2),
maximum_block_size="auto", device_number=device_number)
# This function should be used if number of needed registers is known. Can be determined with func.num_regs
registers_per_thread = 1000
blocks = bi.limit_block_size_by_register_restriction([1024, 1024, 1], registers_per_thread)
if cp.cuda.runtime.is_hip:
max_registers_per_block = cp.cuda.runtime.deviceGetAttribute(71, device_number)
else:
device = cp.cuda.Device(device_number)
da = device.attributes
max_registers_per_block = da.get("MaxRegistersPerBlock")
assert np.prod(blocks) * registers_per_thread < max_registers_per_block
@pytest.mark.parametrize('gpu_indexing', ("block", "line"))
@pytest.mark.parametrize('layout', ("C", "F"))
@pytest.mark.parametrize('shape', ((5, 5, 5, 5), (3, 17, 387, 4), (23, 44, 21, 11)))
def test_four_dimensional_kernel(gpu_indexing, layout, shape):
pytest.importorskip('cupy')
n_elements = np.prod(shape)
arr_cpu = np.arange(n_elements, dtype=np.float64).reshape(shape, order=layout)
arr_gpu = cp.asarray(arr_cpu)
iteration_slice = make_slice[:, :, :, :]
f = Field.create_from_numpy_array("f", arr_cpu)
update_rule = [Assignment(f.center, sp.Symbol("value"))]
config = CreateKernelConfig(target=Target.GPU, gpu_indexing=gpu_indexing, iteration_slice=iteration_slice)
ast = create_kernel(update_rule, config=config)
kernel = ast.compile()
kernel(f=arr_gpu, value=np.float64(42.0))
np.testing.assert_equal(arr_gpu.get(), np.ones(shape) * 42.0)
@pytest.mark.parametrize('start', (1, 5))
@pytest.mark.parametrize('end', (-1, -2, -3, -4))
@pytest.mark.parametrize('step', (1, 2, 3, 4))
@pytest.mark.parametrize('shape', ([55, 60], [77, 101, 80], [44, 64, 66]))
def test_guards_with_iteration_slices(start, end, step, shape):
iter_slice = tuple([slice(start, end, step)] * len(shape))
kernel_config_gpu = CreateKernelConfig(target=Target.GPU, iteration_slice=iter_slice)
field_1 = fields(f"f(1) : double{list(shape)}")
assignment = Assignment(field_1.center, 1)
ast = create_kernel(assignment, config=kernel_config_gpu)
code_str = get_code_str(ast)
test_strings = list()
iteration_ranges = list()
for i, s in enumerate(iter_slice):
e = ((shape[i] + end) - s.start) / s.step
e = math.ceil(e) + s.start
test_strings.append(f"{s.start} < {e}")
a = s.start
counter = 0
while a < e:
a += 1
counter += 1
iteration_ranges.append(counter)
# check if the expected if statement is in the GPU code
for s in test_strings:
assert s in code_str
# check if these bounds lead to same lengths as the range function would produce
for i in range(len(iter_slice)):
assert iteration_ranges[i] == len(range(iter_slice[i].start, shape[i] + end, iter_slice[i].step))
import pytest
import platform
import numpy as np
import pystencils as ps
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
def test_half_precison(target):
if target == ps.Target.CPU:
if not platform.machine() in ['arm64', 'aarch64']:
pytest.xfail("skipping half precision test on non arm platform")
if 'clang' not in ps.cpu.cpujit.get_compiler_config()['command']:
pytest.xfail("skipping half precision because clang compiler is not used")
if target == ps.Target.GPU:
pytest.importorskip("cupy")
dh = ps.create_data_handling(domain_size=(10, 10), default_target=target)
f1 = dh.add_array("f1", values_per_cell=1, dtype=np.float16)
dh.fill("f1", 1.0, ghost_layers=True)
f2 = dh.add_array("f2", values_per_cell=1, dtype=np.float16)
dh.fill("f2", 2.0, ghost_layers=True)
f3 = dh.add_array("f3", values_per_cell=1, dtype=np.float16)
dh.fill("f3", 0.0, ghost_layers=True)
up = ps.Assignment(f3.center, f1.center + 2.1 * f2.center)
config = ps.CreateKernelConfig(target=dh.default_target, default_number_float=np.float32)
ast = ps.create_kernel(up, config=config)
kernel = ast.compile()
dh.run_kernel(kernel)
dh.all_to_cpu()
assert np.all(dh.cpu_arrays[f3.name] == 5.2)
assert dh.cpu_arrays[f3.name].dtype == np.float16
"""
"""
import pytest
from pystencils.astnodes import Block
from pystencils.backends.cbackend import CustomCodeNode, get_headers
def test_headers_have_quotes_or_brackets():
class ErrorNode1(CustomCodeNode):
def __init__(self):
super().__init__("", [], [])
self.headers = ["iostream"]
class ErrorNode2(CustomCodeNode):
headers = ["<iostream>", "foo"]
def __init__(self):
super().__init__("", [], [])
self.headers = ["<iostream>", "foo"]
class OkNode3(CustomCodeNode):
def __init__(self):
super().__init__("", [], [])
self.headers = ["<iostream>", '"foo"']
with pytest.raises(AssertionError, match='.* does not follow the pattern .*'):
get_headers(Block([ErrorNode1()]))
with pytest.raises(AssertionError, match='.* does not follow the pattern .*'):
get_headers(ErrorNode2())
get_headers(OkNode3())
import sympy as sp
import numpy as np
import pytest
import pystencils as ps
from pystencils import Assignment, Field, CreateKernelConfig, create_kernel, Target
from pystencils.transformations import filtered_tree_iteration
from pystencils.typing import BasicType, FieldPointerSymbol, PointerType, TypedSymbol
@pytest.mark.parametrize('target', [ps.Target.CPU, ps.Target.GPU])
def test_indexed_kernel(target):
if target == Target.GPU:
pytest.importorskip("cupy")
import cupy as cp
arr = np.zeros((3, 4))
dtype = np.dtype([('x', int), ('y', int), ('value', arr.dtype)])
index_arr = np.zeros((3,), dtype=dtype)
index_arr[0] = (0, 2, 3.0)
index_arr[1] = (1, 3, 42.0)
index_arr[2] = (2, 1, 5.0)
indexed_field = Field.create_from_numpy_array('index', index_arr)
normal_field = Field.create_from_numpy_array('f', arr)
update_rule = Assignment(normal_field[0, 0], indexed_field('value'))
config = CreateKernelConfig(target=target, index_fields=[indexed_field])
ast = create_kernel([update_rule], config=config)
kernel = ast.compile()
if target == Target.CPU:
kernel(f=arr, index=index_arr)
else:
gpu_arr = cp.asarray(arr)
gpu_index_arr = cp.ndarray(index_arr.shape, dtype=index_arr.dtype)
gpu_index_arr.set(index_arr)
kernel(f=gpu_arr, index=gpu_index_arr)
arr = gpu_arr.get()
for i in range(index_arr.shape[0]):
np.testing.assert_allclose(arr[index_arr[i]['x'], index_arr[i]['y']], index_arr[i]['value'], atol=1e-13)
@pytest.mark.parametrize('index_size', ("fixed", "variable"))
@pytest.mark.parametrize('array_size', ("3D", "2D", "10, 12", "13, 17, 19"))
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
@pytest.mark.parametrize('dtype', ("float64", "float32"))
def test_indexed_domain_kernel(index_size, array_size, target, dtype):
dtype = BasicType(dtype)
f = ps.fields(f'f(1): {dtype.numpy_dtype.name}[{array_size}]')
g = ps.fields(f'g(1): {dtype.numpy_dtype.name}[{array_size}]')
index = TypedSymbol("index", dtype=BasicType(np.int16))
if index_size == "variable":
index_src = TypedSymbol("_size_src", dtype=BasicType(np.int16))
index_dst = TypedSymbol("_size_dst", dtype=BasicType(np.int16))
else:
index_src = 16
index_dst = 16
pointer_type = PointerType(dtype, const=False, restrict=True, double_pointer=True)
const_pointer_type = PointerType(dtype, const=True, restrict=True, double_pointer=True)
src = sp.IndexedBase(TypedSymbol(f"_data_{f.name}", dtype=const_pointer_type), shape=index_src)
dst = sp.IndexedBase(TypedSymbol(f"_data_{g.name}", dtype=pointer_type), shape=index_dst)
update_rule = [ps.Assignment(FieldPointerSymbol("f", dtype, const=True), src[index + 1]),
ps.Assignment(FieldPointerSymbol("g", dtype, const=False), dst[index + 1]),
ps.Assignment(g.center, f.center)]
ast = ps.create_kernel(update_rule, target=target)
code = ps.get_code_str(ast)
assert f"const {dtype.c_name} * RESTRICT _data_f = (({dtype.c_name} * RESTRICT const)(_data_f[index + 1]));" in code
assert f"{dtype.c_name} * RESTRICT _data_g = (({dtype.c_name} * RESTRICT )(_data_g[index + 1]));" in code
if target == Target.CPU:
assert code.count("for") == f.spatial_dimensions + 1
import numpy as np import numpy as np
from pystencils import show_code
from pystencils.transformations import move_constants_before_loop, make_loop_over_domain, resolve_field_accesses from pystencils import get_code_obj
from pystencils.field import Field from pystencils.astnodes import Block, KernelFunction, SympyAssignment
from pystencils.astnodes import SympyAssignment, Block
from pystencils.cpu import make_python_function from pystencils.cpu import make_python_function
from pystencils.field import Field
from pystencils.enums import Target, Backend
from pystencils.transformations import (
make_loop_over_domain, move_constants_before_loop, resolve_field_accesses)
def test_jacobi_fixed_field_size(): def test_jacobi_fixed_field_size():
...@@ -19,7 +22,8 @@ def test_jacobi_fixed_field_size(): ...@@ -19,7 +22,8 @@ def test_jacobi_fixed_field_size():
jacobi = SympyAssignment(d[0, 0], (f[1, 0] + f[-1, 0] + f[0, 1] + f[0, -1]) / 4) jacobi = SympyAssignment(d[0, 0], (f[1, 0] + f[-1, 0] + f[0, 1] + f[0, -1]) / 4)
body = Block([jacobi]) body = Block([jacobi])
ast_node = make_loop_over_domain(body, "kernel") loop_node, gl_info = make_loop_over_domain(body)
ast_node = KernelFunction(loop_node, Target.CPU, Backend.C, make_python_function, ghost_layers=gl_info)
resolve_field_accesses(ast_node) resolve_field_accesses(ast_node)
move_constants_before_loop(ast_node) move_constants_before_loop(ast_node)
...@@ -28,12 +32,12 @@ def test_jacobi_fixed_field_size(): ...@@ -28,12 +32,12 @@ def test_jacobi_fixed_field_size():
dst_field_py[x, y] = 0.25 * (src_field_py[x - 1, y] + src_field_py[x + 1, y] + dst_field_py[x, y] = 0.25 * (src_field_py[x - 1, y] + src_field_py[x + 1, y] +
src_field_py[x, y - 1] + src_field_py[x, y + 1]) src_field_py[x, y - 1] + src_field_py[x, y + 1])
kernel = make_python_function(ast_node) kernel = ast_node.compile()
kernel(f=src_field_c, d=dst_field_c) kernel(f=src_field_c, d=dst_field_c)
error = np.sum(np.abs(dst_field_py - dst_field_c)) error = np.sum(np.abs(dst_field_py - dst_field_c))
np.testing.assert_allclose(error, 0.0, atol=1e-13) np.testing.assert_allclose(error, 0.0, atol=1e-13)
code_display = show_code(ast_node) code_display = get_code_obj(ast_node)
assert 'for' in str(code_display) assert 'for' in str(code_display)
assert 'for' in code_display._repr_html_() assert 'for' in code_display._repr_html_()
...@@ -44,7 +48,8 @@ def test_jacobi_variable_field_size(): ...@@ -44,7 +48,8 @@ def test_jacobi_variable_field_size():
d = Field.create_generic("d", 3) d = Field.create_generic("d", 3)
jacobi = SympyAssignment(d[0, 0, 0], (f[1, 0, 0] + f[-1, 0, 0] + f[0, 1, 0] + f[0, -1, 0]) / 4) jacobi = SympyAssignment(d[0, 0, 0], (f[1, 0, 0] + f[-1, 0, 0] + f[0, 1, 0] + f[0, -1, 0]) / 4)
body = Block([jacobi]) body = Block([jacobi])
ast_node = make_loop_over_domain(body, "kernel") loop_node, gl_info = make_loop_over_domain(body)
ast_node = KernelFunction(loop_node, Target.CPU, Backend.C, make_python_function, ghost_layers=gl_info)
resolve_field_accesses(ast_node) resolve_field_accesses(ast_node)
move_constants_before_loop(ast_node) move_constants_before_loop(ast_node)
...@@ -53,13 +58,13 @@ def test_jacobi_variable_field_size(): ...@@ -53,13 +58,13 @@ def test_jacobi_variable_field_size():
dst_field_c = np.zeros(size) dst_field_c = np.zeros(size)
dst_field_py = np.zeros(size) dst_field_py = np.zeros(size)
for x in range(1, size[0]-1): for x in range(1, size[0] - 1):
for y in range(1, size[1]-1): for y in range(1, size[1] - 1):
for z in range(1, size[2]-1): for z in range(1, size[2] - 1):
dst_field_py[x, y, z] = 0.25 * (src_field_py[x - 1, y, z] + src_field_py[x + 1, y, z] + dst_field_py[x, y, z] = 0.25 * (src_field_py[x - 1, y, z] + src_field_py[x + 1, y, z] +
src_field_py[x, y - 1, z] + src_field_py[x, y + 1, z]) src_field_py[x, y - 1, z] + src_field_py[x, y + 1, z])
kernel = make_python_function(ast_node) kernel = ast_node.compile()
kernel(f=src_field_c, d=dst_field_c) kernel(f=src_field_c, d=dst_field_c)
error = np.sum(np.abs(dst_field_py-dst_field_c)) error = np.sum(np.abs(dst_field_py - dst_field_c))
np.testing.assert_allclose(error, 0.0, atol=1e-13) np.testing.assert_allclose(error, 0.0, atol=1e-13)