Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 1560 additions and 58 deletions
import pytest
import sympy as sp
import numpy
import pystencils
from pystencils.datahandling import create_data_handling
@pytest.mark.parametrize('dtype', ["float64", "float32"])
@pytest.mark.parametrize('sympy_function', [sp.Min, sp.Max])
def test_max(dtype, sympy_function):
dh = create_data_handling(domain_size=(10, 10), periodicity=True)
x = dh.add_array('x', values_per_cell=1, dtype=dtype)
dh.fill("x", 0.0, ghost_layers=True)
y = dh.add_array('y', values_per_cell=1, dtype=dtype)
dh.fill("y", 1.0, ghost_layers=True)
z = dh.add_array('z', values_per_cell=1, dtype=dtype)
dh.fill("z", 2.0, ghost_layers=True)
config = pystencils.CreateKernelConfig(default_number_float=dtype)
# test sp.Max with one argument
assignment_1 = pystencils.Assignment(x.center, sympy_function(y.center + 3.3))
ast_1 = pystencils.create_kernel(assignment_1, config=config)
kernel_1 = ast_1.compile()
# pystencils.show_code(ast_1)
# test sp.Max with two arguments
assignment_2 = pystencils.Assignment(x.center, sympy_function(0.5, y.center - 1.5))
ast_2 = pystencils.create_kernel(assignment_2, config=config)
kernel_2 = ast_2.compile()
# pystencils.show_code(ast_2)
# test sp.Max with many arguments
assignment_3 = pystencils.Assignment(x.center, sympy_function(z.center, 4.5, y.center - 1.5, y.center + z.center))
ast_3 = pystencils.create_kernel(assignment_3, config=config)
kernel_3 = ast_3.compile()
# pystencils.show_code(ast_3)
if sympy_function is sp.Max:
results = [4.3, 0.5, 4.5]
else:
results = [4.3, -0.5, -0.5]
dh.run_kernel(kernel_1)
assert numpy.all(dh.gather_array('x') == results[0])
dh.run_kernel(kernel_2)
assert numpy.all(dh.gather_array('x') == results[1])
dh.run_kernel(kernel_3)
assert numpy.all(dh.gather_array('x') == results[2])
@pytest.mark.parametrize('dtype', ["int64", 'int32'])
@pytest.mark.parametrize('sympy_function', [sp.Min, sp.Max])
def test_max_integer(dtype, sympy_function):
dh = create_data_handling(domain_size=(10, 10), periodicity=True)
x = dh.add_array('x', values_per_cell=1, dtype=dtype)
dh.fill("x", 0, ghost_layers=True)
y = dh.add_array('y', values_per_cell=1, dtype=dtype)
dh.fill("y", 1, ghost_layers=True)
z = dh.add_array('z', values_per_cell=1, dtype=dtype)
dh.fill("z", 2, ghost_layers=True)
config = pystencils.CreateKernelConfig(default_number_int=dtype)
# test sp.Max with one argument
assignment_1 = pystencils.Assignment(x.center, sympy_function(y.center + 3))
ast_1 = pystencils.create_kernel(assignment_1, config=config)
kernel_1 = ast_1.compile()
# pystencils.show_code(ast_1)
# test sp.Max with two arguments
assignment_2 = pystencils.Assignment(x.center, sympy_function(1, y.center - 1))
ast_2 = pystencils.create_kernel(assignment_2, config=config)
kernel_2 = ast_2.compile()
# pystencils.show_code(ast_2)
# test sp.Max with many arguments
assignment_3 = pystencils.Assignment(x.center, sympy_function(z.center, 4, y.center - 1, y.center + z.center))
ast_3 = pystencils.create_kernel(assignment_3, config=config)
kernel_3 = ast_3.compile()
# pystencils.show_code(ast_3)
if sympy_function is sp.Max:
results = [4, 1, 4]
else:
results = [4, 0, 0]
dh.run_kernel(kernel_1)
assert numpy.all(dh.gather_array('x') == results[0])
dh.run_kernel(kernel_2)
assert numpy.all(dh.gather_array('x') == results[1])
dh.run_kernel(kernel_3)
assert numpy.all(dh.gather_array('x') == results[2])
import pytest
import pystencils.config
import sympy
import pystencils as ps
from pystencils.typing import CastFunc, create_type
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
def test_abs(target):
x, y, z = ps.fields('x, y, z: float64[2d]')
default_int_type = create_type('int64')
assignments = ps.AssignmentCollection({x[0, 0]: sympy.Abs(CastFunc(y[0, 0], default_int_type))})
config = pystencils.config.CreateKernelConfig(target=target)
ast = ps.create_kernel(assignments, config=config)
code = ps.get_code_str(ast)
print(code)
assert 'fabs(' not in code
"""
Test of pystencils.data_types.address_of
"""
import pytest
import pystencils
from pystencils.typing import PointerType, CastFunc, BasicType
from pystencils.functions import AddressOf
from pystencils.simp.simplifications import sympy_cse
import sympy as sp
def test_address_of():
x, y = pystencils.fields('x, y: int64[2d]')
s = pystencils.TypedSymbol('s', PointerType(BasicType('int64')))
assert AddressOf(x[0, 0]).canonical() == x[0, 0]
assert AddressOf(x[0, 0]).dtype == PointerType(x[0, 0].dtype, restrict=True)
with pytest.raises(ValueError):
assert AddressOf(sp.Symbol("a")).dtype
assignments = pystencils.AssignmentCollection({
s: AddressOf(x[0, 0]),
y[0, 0]: CastFunc(s, BasicType('int64'))
})
kernel = pystencils.create_kernel(assignments).compile()
# pystencils.show_code(kernel.ast)
assignments = pystencils.AssignmentCollection({
y[0, 0]: CastFunc(AddressOf(x[0, 0]), BasicType('int64'))
})
kernel = pystencils.create_kernel(assignments).compile()
# pystencils.show_code(kernel.ast)
def test_address_of_with_cse():
x, y = pystencils.fields('x, y: int64[2d]')
assignments = pystencils.AssignmentCollection({
x[0, 0]: CastFunc(AddressOf(x[0, 0]), BasicType('int64')) + 1
})
kernel = pystencils.create_kernel(assignments).compile()
# pystencils.show_code(kernel.ast)
assignments_cse = sympy_cse(assignments)
kernel = pystencils.create_kernel(assignments_cse).compile()
# pystencils.show_code(kernel.ast)
import pytest
from pystencils import create_data_handling
from pystencils.alignedarray import *
from pystencils.field import create_numpy_array_with_layout
......@@ -11,45 +13,45 @@ def is_aligned(arr, alignment, byte_offset=0):
return rest == 0
def test_1d_arrays():
for alignment in [8, 8*4]:
for shape in [17, 16, (16, 16), (17, 17), (18, 18), (19, 19)]:
arrays = [
aligned_zeros(shape, alignment),
aligned_ones(shape, alignment),
aligned_empty(shape, alignment),
]
for arr in arrays:
assert is_aligned(arr, alignment)
@pytest.mark.parametrize("alignment", [8, 8*4, True])
@pytest.mark.parametrize("shape", [17, 16, (16, 16), (17, 17), (18, 18), (19, 19)])
def test_1d_arrays(alignment, shape):
arrays = [
aligned_zeros(shape, alignment),
aligned_ones(shape, alignment),
aligned_empty(shape, alignment),
]
for arr in arrays:
assert is_aligned(arr, alignment)
def test_3d_arrays():
for order in ('C', 'F'):
for alignment in [8, 8*4]:
for shape in [(16, 16), (17, 17), (18, 18), (19, 19)]:
arrays = [
aligned_zeros(shape, alignment, order=order),
aligned_ones(shape, alignment, order=order),
aligned_empty(shape, alignment, order=order),
]
for arr in arrays:
assert is_aligned(arr, alignment)
if order == 'C':
assert is_aligned(arr[1], alignment)
assert is_aligned(arr[5], alignment)
else:
assert is_aligned(arr[..., 1], alignment)
assert is_aligned(arr[..., 5], alignment)
@pytest.mark.parametrize("order", ['C', 'F'])
@pytest.mark.parametrize("alignment", [8, 8*4, True])
@pytest.mark.parametrize("shape", [(16, 16), (17, 17), (18, 18), (19, 19)])
def test_3d_arrays(order, alignment, shape):
arrays = [
aligned_zeros(shape, alignment, order=order),
aligned_ones(shape, alignment, order=order),
aligned_empty(shape, alignment, order=order),
]
for arr in arrays:
assert is_aligned(arr, alignment)
if order == 'C':
assert is_aligned(arr[1], alignment)
assert is_aligned(arr[5], alignment)
else:
assert is_aligned(arr[..., 1], alignment)
assert is_aligned(arr[..., 5], alignment)
def test_data_handling():
for parallel in (False, True):
for tries in range(16): # try a few times, since we might get lucky and get randomly a correct alignment
dh = create_data_handling((6, 7), default_ghost_layers=1, parallel=parallel)
dh.add_array('test', alignment=8 * 4)
for b in dh.iterate(ghost_layers=True, inner_ghost_layers=True):
arr = b['test']
assert is_aligned(arr[1:, 3:], 8*4)
@pytest.mark.parametrize("parallel", [False, True])
def test_data_handling(parallel):
for tries in range(16): # try a few times, since we might get lucky and get randomly a correct alignment
dh = create_data_handling((6, 7), default_ghost_layers=1, parallel=parallel)
dh.add_array('test', alignment=8 * 4, values_per_cell=1)
for b in dh.iterate(ghost_layers=True, inner_ghost_layers=True):
arr = b['test']
assert is_aligned(arr[1:, 3:], 8*4)
def test_alignment_of_different_layouts():
......@@ -57,13 +59,13 @@ def test_alignment_of_different_layouts():
byte_offset = 8
for tries in range(16): # try a few times, since we might get lucky and get randomly a correct alignment
arr = create_numpy_array_with_layout((3, 4, 5), layout=(0, 1, 2),
alignment=True, byte_offset=byte_offset)
alignment=8*4, byte_offset=byte_offset)
assert is_aligned(arr[offset, ...], 8*4, byte_offset)
arr = create_numpy_array_with_layout((3, 4, 5), layout=(2, 1, 0),
alignment=True, byte_offset=byte_offset)
alignment=8*4, byte_offset=byte_offset)
assert is_aligned(arr[..., offset], 8*4, byte_offset)
arr = create_numpy_array_with_layout((3, 4, 5), layout=(2, 0, 1),
alignment=True, byte_offset=byte_offset)
alignment=8*4, byte_offset=byte_offset)
assert is_aligned(arr[:, 0, :], 8*4, byte_offset)
import pytest
import sympy as sp
import pystencils as ps
from pystencils import Assignment, AssignmentCollection
from pystencils.astnodes import Conditional
from pystencils.simp.assignment_collection import SymbolGen
a, b, c = sp.symbols("a b c")
x, y, z, t = sp.symbols("x y z t")
symbol_gen = SymbolGen("a")
f = ps.fields("f(2) : [2D]")
d = ps.fields("d(2) : [2D]")
def test_assignment_collection():
ac = AssignmentCollection([Assignment(z, x + y)],
[], subexpression_symbol_generator=symbol_gen)
lhs = ac.add_subexpression(t)
assert lhs == sp.Symbol("a_0")
ac.subexpressions.append(Assignment(t, 3))
ac.topological_sort(sort_main_assignments=False, sort_subexpressions=True)
assert ac.subexpressions[0].lhs == t
assert ac.new_with_inserted_subexpression(sp.Symbol("not_defined")) == ac
ac_inserted = ac.new_with_inserted_subexpression(t)
ac_inserted2 = ac.new_without_subexpressions({lhs})
assert all(a == b for a, b in zip(ac_inserted.all_assignments, ac_inserted2.all_assignments))
print(ac_inserted)
assert ac_inserted.subexpressions[0] == Assignment(lhs, 3)
assert 'a_0' in str(ac_inserted)
assert '<table' in ac_inserted._repr_html_()
def test_free_and_defined_symbols():
ac = AssignmentCollection([Assignment(z, x + y), Conditional(t > 0, Assignment(a, b+1), Assignment(a, b+2))],
[], subexpression_symbol_generator=symbol_gen)
print(ac)
print(ac.__repr__)
def test_vector_assignments():
"""From #17 (https://i10git.cs.fau.de/pycodegen/pystencils/issues/17)"""
assignments = ps.Assignment(sp.Matrix([a, b, c]), sp.Matrix([1, 2, 3]))
print(assignments)
def test_wrong_vector_assignments():
"""From #17 (https://i10git.cs.fau.de/pycodegen/pystencils/issues/17)"""
with pytest.raises(AssertionError,
match=r'Matrix(.*) and Matrix(.*) must have same length when performing vector assignment!'):
ps.Assignment(sp.Matrix([a, b]), sp.Matrix([1, 2, 3]))
def test_vector_assignment_collection():
"""From #17 (https://i10git.cs.fau.de/pycodegen/pystencils/issues/17)"""
y_m, x_m = sp.Matrix([a, b, c]), sp.Matrix([1, 2, 3])
assignments = ps.AssignmentCollection({y_m: x_m})
print(assignments)
assignments = ps.AssignmentCollection([ps.Assignment(y_m, x_m)])
print(assignments)
def test_new_with_substitutions():
a1 = ps.Assignment(f[0, 0](0), a * b)
a2 = ps.Assignment(f[0, 0](1), b * c)
ac = ps.AssignmentCollection([a1, a2], subexpressions=[])
subs_dict = {f[0, 0](0): d[0, 0](0), f[0, 0](1): d[0, 0](1)}
subs_ac = ac.new_with_substitutions(subs_dict,
add_substitutions_as_subexpressions=False,
substitute_on_lhs=True,
sort_topologically=True)
assert subs_ac.main_assignments[0].lhs == d[0, 0](0)
assert subs_ac.main_assignments[1].lhs == d[0, 0](1)
subs_ac = ac.new_with_substitutions(subs_dict,
add_substitutions_as_subexpressions=False,
substitute_on_lhs=False,
sort_topologically=True)
assert subs_ac.main_assignments[0].lhs == f[0, 0](0)
assert subs_ac.main_assignments[1].lhs == f[0, 0](1)
subs_dict = {a * b: sp.symbols('xi')}
subs_ac = ac.new_with_substitutions(subs_dict,
add_substitutions_as_subexpressions=False,
substitute_on_lhs=False,
sort_topologically=True)
assert subs_ac.main_assignments[0].rhs == sp.symbols('xi')
assert len(subs_ac.subexpressions) == 0
subs_ac = ac.new_with_substitutions(subs_dict,
add_substitutions_as_subexpressions=True,
substitute_on_lhs=False,
sort_topologically=True)
assert subs_ac.main_assignments[0].rhs == sp.symbols('xi')
assert len(subs_ac.subexpressions) == 1
assert subs_ac.subexpressions[0].lhs == sp.symbols('xi')
def test_copy():
a1 = ps.Assignment(f[0, 0](0), a * b)
a2 = ps.Assignment(f[0, 0](1), b * c)
ac = ps.AssignmentCollection([a1, a2], subexpressions=[])
ac2 = ac.copy()
assert ac2 == ac
def test_set_expressions():
a1 = ps.Assignment(f[0, 0](0), a * b)
a2 = ps.Assignment(f[0, 0](1), b * c)
ac = ps.AssignmentCollection([a1, a2], subexpressions=[])
ac.set_main_assignments_from_dict({d[0, 0](0): b * c})
assert len(ac.main_assignments) == 1
assert ac.main_assignments[0] == ps.Assignment(d[0, 0](0), b * c)
ac.set_sub_expressions_from_dict({sp.symbols('xi'): a * b})
assert len(ac.subexpressions) == 1
assert ac.subexpressions[0] == ps.Assignment(sp.symbols('xi'), a * b)
ac = ac.new_without_subexpressions(subexpressions_to_keep={sp.symbols('xi')})
assert ac.subexpressions[0] == ps.Assignment(sp.symbols('xi'), a * b)
ac = ac.new_without_unused_subexpressions()
assert len(ac.subexpressions) == 0
ac2 = ac.new_without_subexpressions()
assert ac == ac2
def test_free_and_bound_symbols():
a1 = ps.Assignment(a, d[0, 0](0))
a2 = ps.Assignment(f[0, 0](1), b * c)
ac = ps.AssignmentCollection([a2], subexpressions=[a1])
assert f[0, 0](1) in ac.bound_symbols
assert d[0, 0](0) in ac.free_symbols
def test_new_merged():
a1 = ps.Assignment(a, b * c)
a2 = ps.Assignment(a, x * y)
a3 = ps.Assignment(t, x ** 2)
# main assignments
a4 = ps.Assignment(f[0, 0](0), a)
a5 = ps.Assignment(d[0, 0](0), a)
ac = ps.AssignmentCollection([a4], subexpressions=[a1])
ac2 = ps.AssignmentCollection([a5], subexpressions=[a2, a3])
merged_ac = ac.new_merged(ac2)
assert len(merged_ac.subexpressions) == 3
assert len(merged_ac.main_assignments) == 2
assert ps.Assignment(sp.symbols('xi_0'), x * y) in merged_ac.subexpressions
assert ps.Assignment(d[0, 0](0), sp.symbols('xi_0')) in merged_ac.main_assignments
assert a1 in merged_ac.subexpressions
assert a3 in merged_ac.subexpressions
a1 = ps.Assignment(a, 20)
a2 = ps.Assignment(a, 10)
acommon = ps.Assignment(b, a)
# main assignments
a3 = ps.Assignment(f[0, 0](0), b)
a4 = ps.Assignment(d[0, 0](0), b)
ac = ps.AssignmentCollection([a3], subexpressions=[a1, acommon])
ac2 = ps.AssignmentCollection([a4], subexpressions=[a2, acommon])
merged_ac = ac.new_merged(ac2).new_without_subexpressions()
assert ps.Assignment(f[0, 0](0), 20) in merged_ac.main_assignments
assert ps.Assignment(d[0, 0](0), 10) in merged_ac.main_assignments
import pytest
import sys
import pystencils.config
import sympy as sp
import pystencils as ps
from pystencils import Assignment
from pystencils.astnodes import Block, LoopOverCoordinate, SkipIteration, SympyAssignment
dst = ps.fields('dst(8): double[2D]')
s = sp.symbols('s_:8')
x = sp.symbols('x')
y = sp.symbols('y')
python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
def test_kernel_function():
assignments = [
Assignment(dst[0, 0](0), s[0]),
Assignment(x, dst[0, 0](2))
]
ast_node = ps.create_kernel(assignments)
assert ast_node.target == ps.Target.CPU
assert ast_node.backend == ps.Backend.C
# symbols_defined and undefined_symbols will always return an emtpy set
assert ast_node.symbols_defined == set()
assert ast_node.undefined_symbols == set()
assert ast_node.fields_written == {dst}
assert ast_node.fields_read == {dst}
def test_skip_iteration():
# skip iteration is an object which should give back empty data structures.
skipped = SkipIteration()
assert skipped.args == []
assert skipped.symbols_defined == set()
assert skipped.undefined_symbols == set()
def test_block():
assignments = [
Assignment(dst[0, 0](0), s[0]),
Assignment(x, dst[0, 0](2))
]
bl = Block(assignments)
assert bl.symbols_defined == {dst[0, 0](0), dst[0, 0](2), s[0], x}
bl.append([Assignment(y, 10)])
assert bl.symbols_defined == {dst[0, 0](0), dst[0, 0](2), s[0], x, y}
assert len(bl.args) == 3
list_iterator = iter([Assignment(s[1], 11)])
bl.insert_front(list_iterator)
assert bl.args[0] == Assignment(s[1], 11)
def test_loop_over_coordinate():
assignments = [
Assignment(dst[0, 0](0), s[0]),
Assignment(x, dst[0, 0](2))
]
body = Block(assignments)
loop = LoopOverCoordinate(body, coordinate_to_loop_over=0, start=0, stop=10, step=1)
assert loop.body == body
new_body = Block([assignments[0]])
loop = loop.new_loop_with_different_body(new_body)
assert loop.body == new_body
assert loop.start == 0
assert loop.stop == 10
assert loop.step == 1
loop.replace(loop.start, 2)
loop.replace(loop.stop, 20)
loop.replace(loop.step, 2)
assert loop.start == 2
assert loop.stop == 20
assert loop.step == 2
import pytest
import pystencils as ps
@pytest.mark.parametrize('target', [ps.Target.CPU, ps.Target.GPU])
def test_add_augmented_assignment(target):
if target == ps.Target.GPU:
pytest.importorskip("cupy")
domain_size = (5, 5)
dh = ps.create_data_handling(domain_size=domain_size, periodicity=True, default_target=target)
f = dh.add_array("f", values_per_cell=1)
dh.fill(f.name, 0.0)
g = dh.add_array("g", values_per_cell=1)
dh.fill(g.name, 1.0)
up = ps.AddAugmentedAssignment(f.center, g.center)
config = ps.CreateKernelConfig(target=dh.default_target)
ast = ps.create_kernel(up, config=config)
kernel = ast.compile()
for i in range(10):
dh.run_kernel(kernel)
if target == ps.Target.GPU:
dh.all_to_cpu()
result = dh.gather_array(f.name)
for x in range(domain_size[0]):
for y in range(domain_size[1]):
assert result[x, y] == 10
import pytest
from pystencils import Assignment, CreateKernelConfig, Target, fields, create_kernel, get_code_str
@pytest.mark.parametrize('target', (Target.CPU, Target.GPU))
def test_intermediate_base_pointer(target):
x = fields(f'x: double[3d]')
y = fields(f'y: double[3d]')
update = Assignment(x.center, y.center)
config = CreateKernelConfig(base_pointer_specification=[], target=target)
ast = create_kernel(update, config=config)
code = get_code_str(ast)
# no intermediate base pointers are created
assert "_data_x[_stride_x_0*ctr_0 + _stride_x_1*ctr_1 + _stride_x_2*ctr_2] = " \
"_data_y[_stride_y_0*ctr_0 + _stride_y_1*ctr_1 + _stride_y_2*ctr_2];" in code
config = CreateKernelConfig(base_pointer_specification=[[0]], target=target)
ast = create_kernel(update, config=config)
code = get_code_str(ast)
# intermediate base pointers for y and z
assert "double * RESTRICT _data_x_10_20 = _data_x + _stride_x_1*ctr_1 + _stride_x_2*ctr_2;" in code
assert " double * RESTRICT _data_y_10_20 = _data_y + _stride_y_1*ctr_1 + _stride_y_2*ctr_2;" in code
assert "_data_x_10_20[_stride_x_0*ctr_0] = _data_y_10_20[_stride_y_0*ctr_0];" in code
config = CreateKernelConfig(base_pointer_specification=[[1]], target=target)
ast = create_kernel(update, config=config)
code = get_code_str(ast)
# intermediate base pointers for x and z
assert "double * RESTRICT _data_x_00_20 = _data_x + _stride_x_0*ctr_0 + _stride_x_2*ctr_2;" in code
assert "double * RESTRICT _data_y_00_20 = _data_y + _stride_y_0*ctr_0 + _stride_y_2*ctr_2;" in code
assert "_data_x_00_20[_stride_x_1*ctr_1] = _data_y_00_20[_stride_y_1*ctr_1];" in code
config = CreateKernelConfig(base_pointer_specification=[[2]], target=target)
ast = create_kernel(update, config=config)
code = get_code_str(ast)
# intermediate base pointers for x and y
assert "double * RESTRICT _data_x_00_10 = _data_x + _stride_x_0*ctr_0 + _stride_x_1*ctr_1;" in code
assert "double * RESTRICT _data_y_00_10 = _data_y + _stride_y_0*ctr_0 + _stride_y_1*ctr_1;" in code
assert "_data_x_00_10[_stride_x_2*ctr_2] = _data_y_00_10[_stride_y_2*ctr_2];" in code
config = CreateKernelConfig(target=target)
ast = create_kernel(update, config=config)
code = get_code_str(ast)
# by default no intermediate base pointers are created
assert "_data_x[_stride_x_0*ctr_0 + _stride_x_1*ctr_1 + _stride_x_2*ctr_2] = " \
"_data_y[_stride_y_0*ctr_0 + _stride_y_1*ctr_1 + _stride_y_2*ctr_2];" in code
import pytest
import numpy as np
import pystencils as ps
from pystencils import Field, Assignment, create_kernel
from pystencils.bit_masks import flag_cond
@pytest.mark.parametrize('mask_type', [np.uint8, np.uint16, np.uint32, np.uint64])
def test_flag_condition(mask_type):
f_arr = np.zeros((2, 2, 2), dtype=np.float64)
mask_arr = np.zeros((2, 2), dtype=mask_type)
mask_arr[0, 1] = (1 << 3)
mask_arr[1, 0] = (1 << 5)
mask_arr[1, 1] = (1 << 3) + (1 << 5)
f = Field.create_from_numpy_array('f', f_arr, index_dimensions=1)
mask = Field.create_from_numpy_array('mask', mask_arr)
v1 = 42.3
v2 = 39.7
v3 = 119
assignments = [
Assignment(f(0), flag_cond(3, mask(0), v1)),
Assignment(f(1), flag_cond(5, mask(0), v2, v3))
]
kernel = create_kernel(assignments).compile()
kernel(f=f_arr, mask=mask_arr)
code = ps.get_code_str(kernel)
assert '119.0' in code
reference = np.zeros((2, 2, 2), dtype=np.float64)
reference[0, 1, 0] = v1
reference[1, 1, 0] = v1
reference[0, 0, 1] = v3
reference[0, 1, 1] = v3
reference[1, 0, 1] = v2
reference[1, 1, 1] = v2
np.testing.assert_array_equal(f_arr, reference)
......@@ -18,14 +18,25 @@ def check_equivalence(assignments, src_arr):
for vectorization in [False, {'assume_inner_stride_one': True}]:
with_blocking = ps.create_kernel(assignments, cpu_blocking=(8, 16, 4), cpu_openmp=openmp,
cpu_vectorize_info=vectorization).compile()
with_blocking_only_over_y = ps.create_kernel(assignments, cpu_blocking=(0, 16, 0), cpu_openmp=openmp,
cpu_vectorize_info=vectorization).compile()
without_blocking = ps.create_kernel(assignments).compile()
print(" openmp {}, vectorization {}".format(openmp, vectorization))
only_omp = ps.create_kernel(assignments, cpu_openmp=2).compile()
print(f" openmp {openmp}, vectorization {vectorization}")
dst_arr = np.zeros_like(src_arr)
dst2_arr = np.zeros_like(src_arr)
dst3_arr = np.zeros_like(src_arr)
ref_arr = np.zeros_like(src_arr)
np.copyto(src_arr, np.random.rand(*src_arr.shape))
with_blocking(src=src_arr, dst=dst_arr)
with_blocking_only_over_y(src=src_arr, dst=dst2_arr)
without_blocking(src=src_arr, dst=ref_arr)
only_omp(src=src_arr, dst=dst3_arr)
np.testing.assert_almost_equal(ref_arr, dst_arr)
np.testing.assert_almost_equal(ref_arr, dst2_arr)
np.testing.assert_almost_equal(ref_arr, dst3_arr)
def test_jacobi3d_var_size():
......@@ -59,3 +70,11 @@ def test_jacobi3d_fixed_size():
arr = np.empty([8*4, 16*2, 4*3])
src, dst = ps.fields("src, dst: double[3D]", src=arr, dst=arr)
check_equivalence(jacobi(dst, src), arr)
def test_jacobi3d_fixed_field_size():
src, dst = ps.fields("src, dst: double[3, 5, 6]", layout='c')
print("Fixed Field Size: Smaller than block sizes")
arr = np.empty([3, 5, 6])
check_equivalence(jacobi(dst, src), arr)
......@@ -12,8 +12,10 @@ def test_blocking_staggered():
f[0, 0, 0] - f[0, 0, -1],
]
assignments = [ps.Assignment(stag.staggered_access(d), terms[i]) for i, d in enumerate(stag.staggered_stencil)]
reference_kernel = ps.create_staggered_kernel(assignments)
print(ps.show_code(reference_kernel))
reference_kernel = reference_kernel.compile()
kernel = ps.create_staggered_kernel(assignments, cpu_blocking=(3, 16, 8)).compile()
reference_kernel = ps.create_staggered_kernel(assignments).compile()
print(ps.show_code(kernel.ast))
f_arr = np.random.rand(80, 33, 19)
......
......@@ -2,13 +2,15 @@ import os
from tempfile import TemporaryDirectory
import numpy as np
import pytest
import pystencils
from pystencils import Assignment, create_kernel
from pystencils.boundaries import BoundaryHandling, Neumann, add_neumann_boundary
from pystencils.boundaries import BoundaryHandling, Dirichlet, Neumann, add_neumann_boundary
from pystencils.datahandling import SerialDataHandling
from pystencils.enums import Target
from pystencils.slicing import slice_from_direction
from pystencils.timeloop import TimeLoop
def test_kernel_vs_copy_boundary():
......@@ -88,3 +90,157 @@ def test_kernel_vs_copy_boundary():
pytest.importorskip('pyevtk')
boundary_handling.geometry_to_vtk(file_name=os.path.join(tmp_dir, 'test_output1'), ghost_layers=False)
boundary_handling.geometry_to_vtk(file_name=os.path.join(tmp_dir, 'test_output2'), ghost_layers=True)
boundaries = list(boundary_handling._boundary_object_to_boundary_info.keys()) + ['domain']
boundary_handling.geometry_to_vtk(file_name=os.path.join(tmp_dir, 'test_output3'),
boundaries=boundaries[0], ghost_layers=False)
def test_boundary_gpu():
pytest.importorskip('cupy')
dh = SerialDataHandling(domain_size=(7, 7), default_target=Target.GPU)
src = dh.add_array('src')
dh.fill("src", 0.0, ghost_layers=True)
dh.fill("src", 1.0, ghost_layers=False)
src_cpu = dh.add_array('src_cpu', gpu=False)
dh.fill("src_cpu", 0.0, ghost_layers=True)
dh.fill("src_cpu", 1.0, ghost_layers=False)
boundary_stencil = [(1, 0), (-1, 0), (0, 1), (0, -1)]
boundary_handling_cpu = BoundaryHandling(dh, src_cpu.name, boundary_stencil,
name="boundary_handling_cpu", target=Target.CPU)
boundary_handling = BoundaryHandling(dh, src.name, boundary_stencil,
name="boundary_handling_gpu", target=Target.GPU)
neumann = Neumann()
for d in ('N', 'S', 'W', 'E'):
boundary_handling.set_boundary(neumann, slice_from_direction(d, dim=2))
boundary_handling_cpu.set_boundary(neumann, slice_from_direction(d, dim=2))
boundary_handling.prepare()
boundary_handling_cpu.prepare()
boundary_handling_cpu()
dh.all_to_gpu()
boundary_handling()
dh.all_to_cpu()
np.testing.assert_almost_equal(dh.cpu_arrays["src_cpu"], dh.cpu_arrays["src"])
def test_boundary_utility():
dh = SerialDataHandling(domain_size=(7, 7))
src = dh.add_array('src')
dh.fill("src", 0.0, ghost_layers=True)
boundary_stencil = [(1, 0), (-1, 0), (0, 1), (0, -1)]
boundary_handling = BoundaryHandling(dh, src.name, boundary_stencil,
name="boundary_handling", target=Target.CPU)
neumann = Neumann()
dirichlet = Dirichlet(2)
for d in ('N', 'S', 'W', 'E'):
boundary_handling.set_boundary(neumann, slice_from_direction(d, dim=2))
boundary_handling.set_boundary(neumann, (slice(2, 4, None), slice(2, 4, None)))
boundary_handling.prepare()
assert boundary_handling.get_flag(boundary_handling.boundary_objects[0]) == 2
assert boundary_handling.shape == dh.shape
assert boundary_handling.flag_array_name == 'boundary_handlingFlags'
mask_neumann = boundary_handling.get_mask((slice(0, 7), slice(0, 7)), boundary_handling.boundary_objects[0])
np.testing.assert_almost_equal(mask_neumann[1:3, 1:3], 2)
mask_domain = boundary_handling.get_mask((slice(0, 7), slice(0, 7)), "domain")
assert np.sum(mask_domain) == 7 ** 2 - 4
def set_sphere(x, y):
mid = (4, 4)
radius = 2
return (x - mid[0]) ** 2 + (y - mid[1]) ** 2 < radius ** 2
boundary_handling.set_boundary(dirichlet, mask_callback=set_sphere, force_flag_value=4)
mask_dirichlet = boundary_handling.get_mask((slice(0, 7), slice(0, 7)), boundary_handling.boundary_objects[1])
assert np.sum(mask_dirichlet) == 48
assert boundary_handling.set_boundary("domain") == 1
assert boundary_handling.set_boundary(dirichlet, mask_callback=set_sphere, force_flag_value=8, replace=False) == 4
assert boundary_handling.set_boundary(dirichlet, force_flag_value=16, replace=False) == 4
assert boundary_handling.set_boundary_where_flag_is_set(boundary_handling.boundary_objects[0], 16) == 16
def test_add_fix_steps():
dh = SerialDataHandling(domain_size=(7, 7))
src = dh.add_array('src')
dh.fill("src", 0.0, ghost_layers=True)
dh.fill("src", 1.0, ghost_layers=False)
boundary_stencil = [(1, 0), (-1, 0), (0, 1), (0, -1)]
boundary_handling = BoundaryHandling(dh, src.name, boundary_stencil,
name="boundary_handling", target=pystencils.Target.CPU)
neumann = Neumann()
for d in ('N', 'S', 'W', 'E'):
boundary_handling.set_boundary(neumann, slice_from_direction(d, dim=2))
timeloop = TimeLoop(steps=1)
boundary_handling.add_fixed_steps(timeloop)
timeloop.run()
assert np.sum(dh.cpu_arrays['src']) == 7 * 7 + 7 * 4
def test_boundary_data_setter():
dh = SerialDataHandling(domain_size=(7, 7))
src = dh.add_array('src')
dh.fill("src", 0.0, ghost_layers=True)
dh.fill("src", 1.0, ghost_layers=False)
boundary_stencil = [(1, 0), (-1, 0), (0, 1), (0, -1)]
boundary_handling = BoundaryHandling(dh, src.name, boundary_stencil,
name="boundary_handling", target=Target.CPU)
neumann = Neumann()
for d in 'N':
boundary_handling.set_boundary(neumann, slice_from_direction(d, dim=2))
boundary_handling.prepare()
for b in dh.iterate(ghost_layers=True):
index_array_bd = b[boundary_handling._index_array_name]
data_setter = index_array_bd.boundary_object_to_data_setter[boundary_handling.boundary_objects[0]]
y_pos = data_setter.boundary_cell_positions(1)
assert all(y_pos == 5.5)
assert np.all(data_setter.link_offsets() == [0, -1])
assert np.all(data_setter.link_positions(1) == 6.)
@pytest.mark.parametrize('with_indices', ('with_indices', False))
def test_dirichlet(with_indices):
value = (1, 20, 3) if with_indices else 1
dh = SerialDataHandling(domain_size=(7, 7))
src = dh.add_array('src', values_per_cell=3 if with_indices else 1)
dh.cpu_arrays.src[...] = np.random.rand(*src.shape)
boundary_stencil = [(1, 0), (-1, 0), (0, 1), (0, -1)]
boundary_handling = BoundaryHandling(dh, src.name, boundary_stencil)
dirichlet = Dirichlet(value)
assert dirichlet.name == 'Dirichlet'
dirichlet.name = "wall"
assert dirichlet.name == 'wall'
for d in ('N', 'S', 'W', 'E'):
boundary_handling.set_boundary(dirichlet, slice_from_direction(d, dim=2))
boundary_handling()
assert all([np.allclose(a, np.array(value)) for a in dh.cpu_arrays.src[1:-2, 0]])
assert all([np.allclose(a, np.array(value)) for a in dh.cpu_arrays.src[1:-2, -1]])
assert all([np.allclose(a, np.array(value)) for a in dh.cpu_arrays.src[0, 1:-2]])
assert all([np.allclose(a, np.array(value)) for a in dh.cpu_arrays.src[-1, 1:-2]])
import numpy as np
from itertools import product
import pystencils.boundaries.createindexlist as cil
import pytest
@pytest.mark.parametrize('single_link', [False, True])
@pytest.mark.skipif(not cil.cython_funcs_available, reason='Cython functions are not available')
def test_equivalence_cython_python_version(single_link):
# D2Q9
stencil_2d = tuple((x, y) for x, y in product([-1, 0, 1], [-1, 0, 1]))
# D3Q19
stencil_3d = tuple(
(x, y, z) for x, y, z in product([-1, 0, 1], [-1, 0, 1], [-1, 0, 1]) if abs(x) + abs(y) + abs(z) < 3)
for dtype in [int, np.int16, np.uint32]:
fluid_mask = dtype(1)
mask = dtype(2)
flag_field_2d = np.ones([15, 16], dtype=dtype) * fluid_mask
flag_field_3d = np.ones([15, 16, 17], dtype=dtype) * fluid_mask
flag_field_2d[0, :] = mask
flag_field_2d[-1, :] = mask
flag_field_2d[7, 7] = mask
flag_field_3d[0, :, :] = mask
flag_field_3d[-1, :, :] = mask
flag_field_3d[7, 7, 7] = mask
result_python_2d = cil._create_index_list_python(flag_field_2d, mask, fluid_mask,
stencil_2d, single_link, True, 1)
result_python_3d = cil._create_index_list_python(flag_field_3d, mask, fluid_mask,
stencil_3d, single_link, True, 1)
result_cython_2d = cil.create_boundary_index_list(flag_field_2d, stencil_2d, mask,
fluid_mask, 1, True, single_link)
result_cython_3d = cil.create_boundary_index_list(flag_field_3d, stencil_3d, mask,
fluid_mask, 1, True, single_link)
np.testing.assert_equal(result_python_2d, result_cython_2d)
np.testing.assert_equal(result_python_3d, result_cython_3d)
@pytest.mark.parametrize('single_link', [False, True])
@pytest.mark.skipif(not cil.cython_funcs_available, reason='Cython functions are not available')
def test_equivalence_cell_idx_list_cython_python_version(single_link):
# D2Q9
stencil_2d = tuple((x, y) for x, y in product([-1, 0, 1], [-1, 0, 1]))
# D3Q19
stencil_3d = tuple(
(x, y, z) for x, y, z in product([-1, 0, 1], [-1, 0, 1], [-1, 0, 1]) if abs(x) + abs(y) + abs(z) < 3)
for dtype in [int, np.int16, np.uint32]:
fluid_mask = dtype(1)
mask = dtype(2)
flag_field_2d = np.ones([15, 16], dtype=dtype) * fluid_mask
flag_field_3d = np.ones([15, 16, 17], dtype=dtype) * fluid_mask
flag_field_2d[0, :] = mask
flag_field_2d[-1, :] = mask
flag_field_2d[7, 7] = mask
flag_field_3d[0, :, :] = mask
flag_field_3d[-1, :, :] = mask
flag_field_3d[7, 7, 7] = mask
result_python_2d = cil._create_index_list_python(flag_field_2d, mask, fluid_mask,
stencil_2d, single_link, False)
result_python_3d = cil._create_index_list_python(flag_field_3d, mask, fluid_mask,
stencil_3d, single_link, False)
result_cython_2d = cil.create_boundary_index_list(flag_field_2d, stencil_2d, mask, fluid_mask, None,
False, single_link)
result_cython_3d = cil.create_boundary_index_list(flag_field_3d, stencil_3d, mask, fluid_mask, None,
False, single_link)
np.testing.assert_equal(result_python_2d, result_cython_2d)
np.testing.assert_equal(result_python_3d, result_cython_3d)
@pytest.mark.parametrize('inner_or_boundary', [False, True])
def test_normal_calculation(inner_or_boundary):
stencil = tuple((x, y) for x, y in product([-1, 0, 1], [-1, 0, 1]))
domain_size = (32, 32)
dtype = np.uint32
fluid_mask = dtype(1)
mask = dtype(2)
flag_field = np.ones([domain_size[0], domain_size[1]], dtype=dtype) * fluid_mask
radius_inner = domain_size[0] // 4
radius_outer = domain_size[0] // 2
y_mid = domain_size[1] / 2
x_mid = domain_size[0] / 2
for x in range(0, domain_size[0]):
for y in range(0, domain_size[1]):
if (y - y_mid) ** 2 + (x - x_mid) ** 2 < radius_inner ** 2:
flag_field[x, y] = mask
if (x - x_mid) ** 2 + (y - y_mid) ** 2 > radius_outer ** 2:
flag_field[x, y] = mask
args_no_gl = (flag_field, mask, fluid_mask, np.array(stencil, dtype=np.int32), True)
index_list = cil._create_index_list_python(*args_no_gl, inner_or_boundary=inner_or_boundary, nr_of_ghost_layers=1)
checkmask = mask if inner_or_boundary else fluid_mask
for cell in index_list:
idx = cell[2]
cell = tuple((cell[0], cell[1]))
sum_cells = np.zeros(len(cell))
for dir_idx, direction in enumerate(stencil):
neighbor_cell = tuple([cell_i + dir_i for cell_i, dir_i in zip(cell, direction)])
if any(not 0 <= e < upper for e, upper in zip(neighbor_cell, flag_field.shape)):
continue
if flag_field[neighbor_cell] & checkmask:
sum_cells += np.array(direction)
assert np.argmax(np.inner(sum_cells, stencil)) == idx
......@@ -2,6 +2,7 @@
import numpy as np
import pystencils as ps
from pystencils import Assignment, Field, FieldType, create_kernel
from pystencils.field import create_numpy_array_with_layout, layout_string_to_tuple
from pystencils.slicing import (
......@@ -19,9 +20,9 @@ def _generate_fields(dt=np.uint64, num_directions=1, layout='numpy'):
fields = []
for size in field_sizes:
field_layout = layout_string_to_tuple(layout, len(size))
src_arr = create_numpy_array_with_layout(size, field_layout)
src_arr = create_numpy_array_with_layout(size, field_layout, dtype=dt)
array_data = np.reshape(np.arange(1, int(np.prod(size)+1)), size)
array_data = np.reshape(np.arange(1, int(np.prod(size) + 1)), size)
# Use flat iterator to input data into the array
src_arr.flat = add_ghost_layers(array_data, index_dimensions=1 if num_directions > 1 else 0).astype(dt).flat
dst_arr = np.zeros(src_arr.shape, dtype=dt)
......@@ -40,13 +41,18 @@ def test_full_scalar_field():
field_type=FieldType.BUFFER, dtype=src_arr.dtype)
pack_eqs = [Assignment(buffer.center(), src_field.center())]
pack_code = create_kernel(pack_eqs, data_type={'src_field': src_arr.dtype, 'buffer': buffer.dtype})
config = ps.CreateKernelConfig(data_type={'src_field': src_arr.dtype, 'buffer': buffer.dtype})
pack_code = create_kernel(pack_eqs, config=config)
code = ps.get_code_str(pack_code)
ps.show_code(pack_code)
pack_kernel = pack_code.compile()
pack_kernel(buffer=buffer_arr, src_field=src_arr)
unpack_eqs = [Assignment(dst_field.center(), buffer.center())]
unpack_code = create_kernel(unpack_eqs, data_type={'dst_field': dst_arr.dtype, 'buffer': buffer.dtype})
config = ps.CreateKernelConfig(data_type={'dst_field': dst_arr.dtype, 'buffer': buffer.dtype})
unpack_code = create_kernel(unpack_eqs, config=config)
unpack_kernel = unpack_code.compile()
unpack_kernel(dst_field=dst_arr, buffer=buffer_arr)
......@@ -70,14 +76,18 @@ def test_field_slice():
field_type=FieldType.BUFFER, dtype=src_arr.dtype)
pack_eqs = [Assignment(buffer.center(), src_field.center())]
pack_code = create_kernel(pack_eqs, data_type={'src_field': src_arr.dtype, 'buffer': buffer.dtype})
config = ps.CreateKernelConfig(data_type={'src_field': src_arr.dtype, 'buffer': buffer.dtype})
pack_code = create_kernel(pack_eqs, config=config)
pack_kernel = pack_code.compile()
pack_kernel(buffer=bufferArr, src_field=src_arr[pack_slice])
# Unpack into ghost layer of dst_field in N direction
unpack_eqs = [Assignment(dst_field.center(), buffer.center())]
unpack_code = create_kernel(unpack_eqs, data_type={'dst_field': dst_arr.dtype, 'buffer': buffer.dtype})
config = ps.CreateKernelConfig(data_type={'dst_field': dst_arr.dtype, 'buffer': buffer.dtype})
unpack_code = create_kernel(unpack_eqs, config=config)
unpack_kernel = unpack_code.compile()
unpack_kernel(buffer=bufferArr, dst_field=dst_arr[unpack_slice])
......@@ -102,7 +112,8 @@ def test_all_cell_values():
eq = Assignment(buffer(idx), src_field(idx))
pack_eqs.append(eq)
pack_code = create_kernel(pack_eqs, data_type={'src_field': src_arr.dtype, 'buffer': buffer.dtype})
config = ps.CreateKernelConfig(data_type={'src_field': src_arr.dtype, 'buffer': buffer.dtype})
pack_code = create_kernel(pack_eqs, config=config)
pack_kernel = pack_code.compile()
pack_kernel(buffer=bufferArr, src_field=src_arr)
......@@ -112,7 +123,8 @@ def test_all_cell_values():
eq = Assignment(dst_field(idx), buffer(idx))
unpack_eqs.append(eq)
unpack_code = create_kernel(unpack_eqs, data_type={'dst_field': dst_arr.dtype, 'buffer': buffer.dtype})
config = ps.CreateKernelConfig(data_type={'dst_field': dst_arr.dtype, 'buffer': buffer.dtype})
unpack_code = create_kernel(unpack_eqs, config=config)
unpack_kernel = unpack_code.compile()
unpack_kernel(buffer=bufferArr, dst_field=dst_arr)
......@@ -138,7 +150,8 @@ def test_subset_cell_values():
eq = Assignment(buffer(buffer_idx), src_field(cell_idx))
pack_eqs.append(eq)
pack_code = create_kernel(pack_eqs, data_type={'src_field': src_arr.dtype, 'buffer': buffer.dtype})
config = ps.CreateKernelConfig(data_type={'src_field': src_arr.dtype, 'buffer': buffer.dtype})
pack_code = create_kernel(pack_eqs, config=config)
pack_kernel = pack_code.compile()
pack_kernel(buffer=bufferArr, src_field=src_arr)
......@@ -148,7 +161,8 @@ def test_subset_cell_values():
eq = Assignment(dst_field(cell_idx), buffer(buffer_idx))
unpack_eqs.append(eq)
unpack_code = create_kernel(unpack_eqs, data_type={'dst_field': dst_arr.dtype, 'buffer': buffer.dtype})
config = ps.CreateKernelConfig(data_type={'dst_field': dst_arr.dtype, 'buffer': buffer.dtype})
unpack_code = create_kernel(unpack_eqs, config=config)
unpack_kernel = unpack_code.compile()
unpack_kernel(buffer=bufferArr, dst_field=dst_arr)
......@@ -173,7 +187,8 @@ def test_field_layouts():
eq = Assignment(buffer(idx), src_field(idx))
pack_eqs.append(eq)
pack_code = create_kernel(pack_eqs, data_type={'src_field': src_arr.dtype, 'buffer': buffer.dtype})
config = ps.CreateKernelConfig(data_type={'src_field': src_arr.dtype, 'buffer': buffer.dtype})
pack_code = create_kernel(pack_eqs, config=config)
pack_kernel = pack_code.compile()
pack_kernel(buffer=bufferArr, src_field=src_arr)
......@@ -183,6 +198,62 @@ def test_field_layouts():
eq = Assignment(dst_field(idx), buffer(idx))
unpack_eqs.append(eq)
unpack_code = create_kernel(unpack_eqs, data_type={'dst_field': dst_arr.dtype, 'buffer': buffer.dtype})
config = ps.CreateKernelConfig(data_type={'dst_field': dst_arr.dtype, 'buffer': buffer.dtype})
unpack_code = create_kernel(unpack_eqs, config=config)
unpack_kernel = unpack_code.compile()
unpack_kernel(buffer=bufferArr, dst_field=dst_arr)
def test_iteration_slices():
num_cell_values = 19
dt = np.uint64
fields = _generate_fields(dt=dt, num_directions=num_cell_values)
for (src_arr, dst_arr, bufferArr) in fields:
spatial_dimensions = len(src_arr.shape) - 1
# src_field = Field.create_from_numpy_array("src_field", src_arr, index_dimensions=1)
# dst_field = Field.create_from_numpy_array("dst_field", dst_arr, index_dimensions=1)
src_field = Field.create_generic("src_field", spatial_dimensions, index_shape=(num_cell_values,), dtype=dt)
dst_field = Field.create_generic("dst_field", spatial_dimensions, index_shape=(num_cell_values,), dtype=dt)
buffer = Field.create_generic("buffer", spatial_dimensions=1, index_dimensions=1,
field_type=FieldType.BUFFER, dtype=src_arr.dtype)
pack_eqs = []
# Since we are packing all cell values for all cells, then
# the buffer index is equivalent to the field index
for idx in range(num_cell_values):
eq = Assignment(buffer(idx), src_field(idx))
pack_eqs.append(eq)
dim = src_field.spatial_dimensions
# Pack only the leftmost slice, only every second cell
pack_slice = (slice(None, None, 2),) * (dim - 1) + (0,)
# Fill the entire array with data
src_arr[(slice(None, None, 1),) * dim] = np.arange(num_cell_values)
dst_arr.fill(0)
config = ps.CreateKernelConfig(iteration_slice=pack_slice,
data_type={'src_field': src_arr.dtype, 'buffer': buffer.dtype})
pack_code = create_kernel(pack_eqs, config=config)
pack_kernel = pack_code.compile()
pack_kernel(buffer=bufferArr, src_field=src_arr)
unpack_eqs = []
for idx in range(num_cell_values):
eq = Assignment(dst_field(idx), buffer(idx))
unpack_eqs.append(eq)
config = ps.CreateKernelConfig(iteration_slice=pack_slice,
data_type={'dst_field': dst_arr.dtype, 'buffer': buffer.dtype})
unpack_code = create_kernel(unpack_eqs, config=config)
unpack_kernel = unpack_code.compile()
unpack_kernel(buffer=bufferArr, dst_field=dst_arr)
# Check if only every second entry of the leftmost slice has been copied
np.testing.assert_equal(dst_arr[pack_slice], src_arr[pack_slice])
np.testing.assert_equal(dst_arr[(slice(1, None, 2),) * (dim - 1) + (0,)], 0)
np.testing.assert_equal(dst_arr[(slice(None, None, 1),) * (dim - 1) + (slice(1, None),)], 0)
"""Tests for the (un)packing (from)to buffers on a CUDA GPU."""
from dataclasses import replace
import numpy as np
import pytest
from pystencils import Assignment, Field, FieldType
import pystencils
from pystencils import Assignment, Field, FieldType, Target, CreateKernelConfig, create_kernel, fields
from pystencils.bit_masks import flag_cond
from pystencils.field import create_numpy_array_with_layout, layout_string_to_tuple
from pystencils.gpucuda import create_cuda_kernel, make_python_function
from pystencils.slicing import (
add_ghost_layers, get_ghost_region_slice, get_slice_before_ghost_layer)
from pystencils.stencil import direction_string_to_offset
try:
# noinspection PyUnresolvedReferences
import pycuda.autoinit
import pycuda.gpuarray as gpuarray
import cupy as cp
except ImportError:
pass
......@@ -22,7 +23,7 @@ FIELD_SIZES = [(4, 3), (9, 3, 7)]
def _generate_fields(dt=np.uint8, stencil_directions=1, layout='numpy'):
pytest.importorskip('pycuda')
pytest.importorskip('cupy')
field_sizes = FIELD_SIZES
if stencil_directions > 1:
field_sizes = [s + (stencil_directions,) for s in field_sizes]
......@@ -37,9 +38,10 @@ def _generate_fields(dt=np.uint8, stencil_directions=1, layout='numpy'):
src_arr.flat = add_ghost_layers(array_data,
index_dimensions=1 if stencil_directions > 1 else 0).astype(dt).flat
gpu_src_arr = gpuarray.to_gpu(src_arr)
gpu_dst_arr = gpuarray.zeros_like(gpu_src_arr)
gpu_buffer_arr = gpuarray.zeros(np.prod(src_arr.shape), dtype=dt)
gpu_src_arr = cp.asarray(src_arr)
gpu_dst_arr = cp.zeros_like(gpu_src_arr)
size = int(np.prod(src_arr.shape))
gpu_buffer_arr = cp.zeros(size, dtype=dt)
fields.append((src_arr, gpu_src_arr, gpu_dst_arr, gpu_buffer_arr))
return fields
......@@ -56,16 +58,20 @@ def test_full_scalar_field():
pack_eqs = [Assignment(buffer.center(), src_field.center())]
pack_types = {'src_field': gpu_src_arr.dtype, 'buffer': gpu_buffer_arr.dtype}
pack_code = create_cuda_kernel(pack_eqs, type_info=pack_types)
pack_kernel = make_python_function(pack_code)
config = CreateKernelConfig(target=pystencils.Target.GPU, data_type=pack_types)
pack_ast = create_kernel(pack_eqs, config=config)
pack_kernel = pack_ast.compile()
pack_kernel(buffer=gpu_buffer_arr, src_field=gpu_src_arr)
unpack_eqs = [Assignment(dst_field.center(), buffer.center())]
unpack_types = {'dst_field': gpu_dst_arr.dtype, 'buffer': gpu_buffer_arr.dtype}
unpack_code = create_cuda_kernel(unpack_eqs, type_info=unpack_types)
unpack_kernel = make_python_function(unpack_code)
config = CreateKernelConfig(target=pystencils.Target.GPU, data_type=unpack_types)
unpack_ast = create_kernel(unpack_eqs, config=config)
unpack_kernel = unpack_ast.compile()
unpack_kernel(dst_field=gpu_dst_arr, buffer=gpu_buffer_arr)
dst_arr = gpu_dst_arr.get()
......@@ -90,17 +96,21 @@ def test_field_slice():
pack_eqs = [Assignment(buffer.center(), src_field.center())]
pack_types = {'src_field': gpu_src_arr.dtype, 'buffer': gpu_buffer_arr.dtype}
pack_code = create_cuda_kernel(pack_eqs, type_info=pack_types)
pack_kernel = make_python_function(pack_code)
config = CreateKernelConfig(target=pystencils.Target.GPU, data_type=pack_types)
pack_ast = create_kernel(pack_eqs, config=config)
pack_kernel = pack_ast.compile()
pack_kernel(buffer=gpu_buffer_arr, src_field=gpu_src_arr[pack_slice])
# Unpack into ghost layer of dst_field in N direction
unpack_eqs = [Assignment(dst_field.center(), buffer.center())]
unpack_types = {'dst_field': gpu_dst_arr.dtype, 'buffer': gpu_buffer_arr.dtype}
unpack_code = create_cuda_kernel(unpack_eqs, type_info=unpack_types)
unpack_kernel = make_python_function(unpack_code)
config = CreateKernelConfig(target=pystencils.Target.GPU, data_type=unpack_types)
unpack_ast = create_kernel(unpack_eqs, config=config)
unpack_kernel = unpack_ast.compile()
unpack_kernel(buffer=gpu_buffer_arr, dst_field=gpu_dst_arr[unpack_slice])
dst_arr = gpu_dst_arr.get()
......@@ -126,8 +136,11 @@ def test_all_cell_values():
pack_eqs.append(eq)
pack_types = {'src_field': gpu_src_arr.dtype, 'buffer': gpu_buffer_arr.dtype}
pack_code = create_cuda_kernel(pack_eqs, type_info=pack_types)
pack_kernel = make_python_function(pack_code)
config = CreateKernelConfig(target=pystencils.Target.GPU, data_type=pack_types)
pack_code = create_kernel(pack_eqs, config=config)
pack_kernel = pack_code.compile()
pack_kernel(buffer=gpu_buffer_arr, src_field=gpu_src_arr)
unpack_eqs = []
......@@ -137,8 +150,10 @@ def test_all_cell_values():
unpack_eqs.append(eq)
unpack_types = {'dst_field': gpu_dst_arr.dtype, 'buffer': gpu_buffer_arr.dtype}
unpack_code = create_cuda_kernel(unpack_eqs, type_info=unpack_types)
unpack_kernel = make_python_function(unpack_code)
config = CreateKernelConfig(target=pystencils.Target.GPU, data_type=unpack_types)
unpack_ast = create_kernel(unpack_eqs, config=config)
unpack_kernel = unpack_ast.compile()
unpack_kernel(buffer=gpu_buffer_arr, dst_field=gpu_dst_arr)
dst_arr = gpu_dst_arr.get()
......@@ -147,7 +162,7 @@ def test_all_cell_values():
def test_subset_cell_values():
"""Tests (un)packing a subset of cell values of the a field (from)to a buffer."""
"""Tests (un)packing a subset of cell values of a field (from)to a buffer."""
num_cell_values = 7
# Cell indices of the field to be (un)packed (from)to the buffer
cell_indices = [1, 3, 5, 6]
......@@ -166,8 +181,9 @@ def test_subset_cell_values():
pack_eqs.append(eq)
pack_types = {'src_field': gpu_src_arr.dtype, 'buffer': gpu_buffer_arr.dtype}
pack_code = create_cuda_kernel(pack_eqs, type_info=pack_types)
pack_kernel = make_python_function(pack_code)
config = CreateKernelConfig(target=pystencils.Target.GPU, data_type=pack_types)
pack_ast = create_kernel(pack_eqs, config=config)
pack_kernel = pack_ast.compile()
pack_kernel(buffer=gpu_buffer_arr, src_field=gpu_src_arr)
unpack_eqs = []
......@@ -177,8 +193,10 @@ def test_subset_cell_values():
unpack_eqs.append(eq)
unpack_types = {'dst_field': gpu_dst_arr.dtype, 'buffer': gpu_buffer_arr.dtype}
unpack_code = create_cuda_kernel(unpack_eqs, type_info=unpack_types)
unpack_kernel = make_python_function(unpack_code)
config = CreateKernelConfig(target=pystencils.Target.GPU, data_type=unpack_types)
unpack_ast = create_kernel(unpack_eqs, config=config)
unpack_kernel = unpack_ast.compile()
unpack_kernel(buffer=gpu_buffer_arr, dst_field=gpu_dst_arr)
dst_arr = gpu_dst_arr.get()
......@@ -205,8 +223,10 @@ def test_field_layouts():
pack_eqs.append(eq)
pack_types = {'src_field': gpu_src_arr.dtype, 'buffer': gpu_buffer_arr.dtype}
pack_code = create_cuda_kernel(pack_eqs, type_info=pack_types)
pack_kernel = make_python_function(pack_code)
config = CreateKernelConfig(target=pystencils.Target.GPU, data_type=pack_types)
pack_ast = create_kernel(pack_eqs, config=config)
pack_kernel = pack_ast.compile()
pack_kernel(buffer=gpu_buffer_arr, src_field=gpu_src_arr)
unpack_eqs = []
......@@ -216,6 +236,99 @@ def test_field_layouts():
unpack_eqs.append(eq)
unpack_types = {'dst_field': gpu_dst_arr.dtype, 'buffer': gpu_buffer_arr.dtype}
unpack_code = create_cuda_kernel(unpack_eqs, type_info=unpack_types)
unpack_kernel = make_python_function(unpack_code)
config = CreateKernelConfig(target=pystencils.Target.GPU, data_type=unpack_types)
unpack_ast = create_kernel(unpack_eqs, config=config)
unpack_kernel = unpack_ast.compile()
unpack_kernel(buffer=gpu_buffer_arr, dst_field=gpu_dst_arr)
def test_buffer_indexing():
src_field, dst_field = fields(f'pdfs_src(19), pdfs_dst(19) :double[3D]')
mask_field = fields(f'mask : uint32 [3D]')
buffer = Field.create_generic('buffer', spatial_dimensions=1, field_type=FieldType.BUFFER,
dtype="float64",
index_shape=(19,))
src_field_size = src_field.spatial_shape
mask_field_size = mask_field.spatial_shape
up = Assignment(buffer(0), flag_cond(1, mask_field.center, src_field[0, 1, 0](1)))
iteration_slice = tuple(slice(None, None, 2) for _ in range(3))
config = CreateKernelConfig(target=Target.GPU)
config = replace(config, iteration_slice=iteration_slice, ghost_layers=0)
ast = create_kernel(up, config=config)
parameters = ast.get_parameters()
spatial_shape_symbols = [p.symbol for p in parameters if p.is_field_shape]
# The loop counters as well as the resolved field access should depend on one common spatial shape
if spatial_shape_symbols[0] in mask_field_size:
for s in spatial_shape_symbols:
assert s in mask_field_size
if spatial_shape_symbols[0] in src_field_size:
for s in spatial_shape_symbols:
assert s in src_field_size
assert len(spatial_shape_symbols) <= 3
@pytest.mark.parametrize('gpu_indexing', ("block", "line"))
def test_iteration_slices(gpu_indexing):
num_cell_values = 19
dt = np.uint64
fields = _generate_fields(dt=dt, stencil_directions=num_cell_values)
for (src_arr, gpu_src_arr, gpu_dst_arr, gpu_buffer_arr) in fields:
src_field = Field.create_from_numpy_array("src_field", gpu_src_arr, index_dimensions=1)
dst_field = Field.create_from_numpy_array("dst_field", gpu_src_arr, index_dimensions=1)
buffer = Field.create_generic("buffer", spatial_dimensions=1, index_dimensions=1,
field_type=FieldType.BUFFER, dtype=src_arr.dtype)
pack_eqs = []
# Since we are packing all cell values for all cells, then
# the buffer index is equivalent to the field index
for idx in range(num_cell_values):
eq = Assignment(buffer(idx), src_field(idx))
pack_eqs.append(eq)
dim = src_field.spatial_dimensions
# Pack only the leftmost slice, only every second cell
pack_slice = (slice(None, None, 2),) * (dim - 1) + (0,)
# Fill the entire array with data
src_arr[(slice(None, None, 1),) * dim] = np.arange(num_cell_values)
gpu_src_arr.set(src_arr)
gpu_dst_arr.fill(0)
config = CreateKernelConfig(target=Target.GPU, iteration_slice=pack_slice,
data_type={'src_field': gpu_src_arr.dtype, 'buffer': gpu_buffer_arr.dtype},
gpu_indexing=gpu_indexing)
pack_code = create_kernel(pack_eqs, config=config)
pack_kernel = pack_code.compile()
pack_kernel(buffer=gpu_buffer_arr, src_field=gpu_src_arr)
unpack_eqs = []
for idx in range(num_cell_values):
eq = Assignment(dst_field(idx), buffer(idx))
unpack_eqs.append(eq)
config = CreateKernelConfig(target=Target.GPU, iteration_slice=pack_slice,
data_type={'dst_field': gpu_dst_arr.dtype, 'buffer': gpu_buffer_arr.dtype},
gpu_indexing=gpu_indexing)
unpack_code = create_kernel(unpack_eqs, config=config)
unpack_kernel = unpack_code.compile()
unpack_kernel(buffer=gpu_buffer_arr, dst_field=gpu_dst_arr)
dst_arr = gpu_dst_arr.get()
src_arr = gpu_src_arr.get()
# Check if only every second entry of the leftmost slice has been copied
np.testing.assert_equal(dst_arr[pack_slice], src_arr[pack_slice])
np.testing.assert_equal(dst_arr[(slice(1, None, 2),) * (dim - 1) + (0,)], 0)
np.testing.assert_equal(dst_arr[(slice(None, None, 1),) * (dim - 1) + (slice(1, None),)], 0)
......@@ -35,11 +35,11 @@ def add_fixed_constant_boundary_handling(assignments, with_cse):
for a in assignment.rhs.atoms(Field.Access) if not a.is_absolute_access
})) for assignment in assignments.all_assignments]
subs = [{a: ConditionalFieldAccess(a, is_out_of_bound(
sp.Matrix(a.offsets) + x_vector(ndim), common_shape))
for a in assignment.rhs.atoms(Field.Access) if not a.is_absolute_access
} for assignment in assignments.all_assignments]
print(subs)
# subs = [{a: ConditionalFieldAccess(a, is_out_of_bound(
# sp.Matrix(a.offsets) + x_vector(ndim), common_shape))
# for a in assignment.rhs.atoms(Field.Access) if not a.is_absolute_access
# } for assignment in assignments.all_assignments]
# print(subs)
if with_cse:
safe_assignments = sympy_cse(ps.AssignmentCollection(safe_assignments))
......@@ -48,22 +48,20 @@ def add_fixed_constant_boundary_handling(assignments, with_cse):
return ps.AssignmentCollection(safe_assignments)
@pytest.mark.parametrize('dtype', ('float64', 'float32'))
@pytest.mark.parametrize('with_cse', (False, 'with_cse'))
def test_boundary_check(with_cse):
def test_boundary_check(dtype, with_cse):
f, g = ps.fields(f"f, g : {dtype}[2D]")
stencil = ps.Assignment(g[0, 0], (f[1, 0] + f[-1, 0] + f[0, 1] + f[0, -1]) / 4)
f, g = ps.fields("f, g : [2D]")
stencil = ps.Assignment(g[0, 0],
(f[1, 0] + f[-1, 0] + f[0, 1] + f[0, -1]) / 4)
f_arr = np.random.rand(1000, 1000)
f_arr = np.random.rand(10, 10).astype(dtype=dtype)
g_arr = np.zeros_like(f_arr)
# kernel(f=f_arr, g=g_arr)
assignments = add_fixed_constant_boundary_handling(ps.AssignmentCollection([stencil]), with_cse)
print(assignments)
kernel_checked = ps.create_kernel(assignments, ghost_layers=0).compile()
ps.show_code(kernel_checked)
config = ps.CreateKernelConfig(data_type=dtype, default_number_float=dtype, ghost_layers=0)
kernel_checked = ps.create_kernel(assignments, config=config).compile()
# ps.show_code(kernel_checked)
# No SEGFAULT, please!!
kernel_checked(f=f_arr, g=g_arr)
import numpy as np
import sympy as sp
import pytest
import pystencils as ps
from pystencils.alignedarray import aligned_zeros
from pystencils.astnodes import Block, Conditional, SympyAssignment
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets, get_vector_instruction_set
from pystencils.enums import Target
from pystencils.cpu.vectorization import vec_all, vec_any
from pystencils.node_collection import NodeCollection
supported_instruction_sets = get_supported_instruction_sets() if get_supported_instruction_sets() else []
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
def test_vec_any(instruction_set, dtype):
if instruction_set in ['sve', 'sve2', 'sme', 'rvv']:
width = 4 # we don't know the actual value
else:
width = get_vector_instruction_set(dtype, instruction_set)['width']
data_arr = np.zeros((4 * width, 4 * width), dtype=dtype)
data_arr[3:9, 1:3 * width - 1] = 1.0
data = ps.fields(f"data: {dtype}[2D]", data=data_arr)
c = [
SympyAssignment(sp.Symbol("t1"), vec_any(data.center() > 0.0)),
Conditional(vec_any(data.center() > 0.0), Block([SympyAssignment(data.center(), 2.0)]))
]
assignmets = NodeCollection(c)
ast = ps.create_kernel(assignments=assignmets, target=ps.Target.CPU,
cpu_vectorize_info={'instruction_set': instruction_set})
kernel = ast.compile()
kernel(data=data_arr)
if instruction_set in ['sve', 'sve2', 'sme', 'rvv']:
# we only know that the first value has changed
np.testing.assert_equal(data_arr[3:9, :3 * width - 1], 2.0)
else:
np.testing.assert_equal(data_arr[3:9, :3 * width], 2.0)
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
def test_vec_all(instruction_set, dtype):
if instruction_set in ['sve', 'sve2', 'sme', 'rvv']:
width = 1000 # we don't know the actual value, need something guaranteed larger than vector
else:
width = get_vector_instruction_set(dtype, instruction_set)['width']
data_arr = np.zeros((4 * width, 4 * width), dtype=dtype)
data_arr[3:9, 1:3 * width - 1] = 1.0
data = ps.fields(f"data: {dtype}[2D]", data=data_arr)
c = [Conditional(vec_all(data.center() > 0.0), Block([SympyAssignment(data.center(), 2.0)]))]
assignmets = NodeCollection(c)
ast = ps.create_kernel(assignmets, target=Target.CPU,
cpu_vectorize_info={'instruction_set': instruction_set})
kernel = ast.compile()
kernel(data=data_arr)
if instruction_set in ['sve', 'sve2', 'sme', 'rvv']:
# we only know that some values in the middle have been replaced
assert np.all(data_arr[3:9, :2] <= 1.0)
assert np.any(data_arr[3:9, 2:] == 2.0)
else:
np.testing.assert_equal(data_arr[3:9, :1], 0.0)
np.testing.assert_equal(data_arr[3:9, 1:width], 1.0)
np.testing.assert_equal(data_arr[3:9, width:2 * width], 2.0)
np.testing.assert_equal(data_arr[3:9, 2 * width:3 * width - 1], 1.0)
np.testing.assert_equal(data_arr[3:9, 3 * width - 1:], 0.0)
@pytest.mark.skipif(not supported_instruction_sets, reason='cannot detect CPU instruction set')
def test_boolean_before_loop():
t1, t2 = sp.symbols('t1, t2')
f_arr = np.ones((10, 10))
g_arr = np.zeros_like(f_arr)
f, g = ps.fields("f, g : double[2D]", f=f_arr, g=g_arr)
a = [
ps.Assignment(t1, t2 > 0),
ps.Assignment(g[0, 0],
sp.Piecewise((f[0, 0], t1), (42, True)))
]
ast = ps.create_kernel(a, cpu_vectorize_info={'instruction_set': supported_instruction_sets[-1]})
kernel = ast.compile()
kernel(f=f_arr, g=g_arr, t2=1.0)
# print(g)
np.testing.assert_array_equal(g_arr, 1.0)
kernel(f=f_arr, g=g_arr, t2=-1.0)
np.testing.assert_array_equal(g_arr, 42.0)
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('nontemporal', [False, True])
@pytest.mark.parametrize('aligned', [False, True])
def test_vec_maskstore(instruction_set, dtype, nontemporal, aligned):
data_arr = (aligned_zeros if aligned else np.zeros)((16, 16), dtype=dtype)
data_arr[3:-3, 3:-3] = 1.0
data = ps.fields(f"data: {dtype}[2D]", data=data_arr)
c = [Conditional(data.center() < 1.0, Block([SympyAssignment(data.center(), 2.0)]))]
assignmets = NodeCollection(c)
config = ps.CreateKernelConfig(cpu_vectorize_info={'instruction_set': instruction_set,
'nontemporal': nontemporal,
'assume_aligned': aligned},
default_number_float=dtype)
ast = ps.create_kernel(assignmets, config=config)
if 'maskStore' in ast.instruction_set:
instruction = 'maskStream' if nontemporal and 'maskStream' in ast.instruction_set else (
'maskStoreA' if aligned and 'maskStoreA' in ast.instruction_set else 'maskStore')
assert ast.instruction_set[instruction].split('{')[0] in ps.get_code_str(ast)
print(ps.get_code_str(ast))
kernel = ast.compile()
kernel(data=data_arr)
np.testing.assert_equal(data_arr[:3, :], 2.0)
np.testing.assert_equal(data_arr[-3:, :], 2.0)
np.testing.assert_equal(data_arr[:, :3], 2.0)
np.testing.assert_equal(data_arr[:, -3:], 2.0)
np.testing.assert_equal(data_arr[3:-3, 3:-3], 1.0)
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('nontemporal', [False, True])
def test_vec_maskscatter(instruction_set, dtype, nontemporal):
data_arr = np.zeros((16, 16), dtype=dtype)
data_arr[3:-3, 3:-3] = 1.0
data = ps.fields(f"data: {dtype}[2D]")
c = [Conditional(data.center() < 1.0, Block([SympyAssignment(data.center(), 2.0)]))]
assignmets = NodeCollection(c)
config = ps.CreateKernelConfig(cpu_vectorize_info={'instruction_set': instruction_set,
'nontemporal': nontemporal},
default_number_float=dtype)
if 'maskStoreS' not in get_vector_instruction_set(dtype, instruction_set) \
and not instruction_set.startswith('sve'):
with pytest.warns(UserWarning) as warn:
ast = ps.create_kernel(assignmets, config=config)
assert 'Could not vectorize loop' in warn[0].message.args[0]
else:
with pytest.warns(None) as warn:
ast = ps.create_kernel(assignmets, config=config)
assert len(warn) == 0
instruction = 'maskStreamS' if nontemporal and 'maskStreamS' in ast.instruction_set else 'maskStoreS'
assert ast.instruction_set[instruction].split('{')[0] in ps.get_code_str(ast)
print(ps.get_code_str(ast))
kernel = ast.compile()
kernel(data=data_arr)
np.testing.assert_equal(data_arr[:3, :], 2.0)
np.testing.assert_equal(data_arr[-3:, :], 2.0)
np.testing.assert_equal(data_arr[:, :3], 2.0)
np.testing.assert_equal(data_arr[:, -3:], 2.0)
np.testing.assert_equal(data_arr[3:-3, 3:-3], 1.0)
from collections import defaultdict
import numpy as np
import pytest
from pystencils import CreateKernelConfig, Target, Backend
from pystencils.typing import BasicType
def test_config():
# targets
config = CreateKernelConfig(target=Target.CPU)
assert config.target == Target.CPU
assert config.backend == Backend.C
config = CreateKernelConfig(target=Target.GPU)
assert config.target == Target.GPU
assert config.backend == Backend.CUDA
# typing
config = CreateKernelConfig(data_type=np.float64)
assert isinstance(config.data_type, defaultdict)
assert config.data_type.default_factory() == BasicType('float64')
assert config.default_number_float == BasicType('float64')
assert config.default_number_int == BasicType('int64')
config = CreateKernelConfig(data_type=np.float32)
assert isinstance(config.data_type, defaultdict)
assert config.data_type.default_factory() == BasicType('float32')
assert config.default_number_float == BasicType('float32')
assert config.default_number_int == BasicType('int64')
config = CreateKernelConfig(data_type=np.float32, default_number_float=np.float64)
assert isinstance(config.data_type, defaultdict)
assert config.data_type.default_factory() == BasicType('float32')
assert config.default_number_float == BasicType('float64')
assert config.default_number_int == BasicType('int64')
config = CreateKernelConfig(data_type=np.float32, default_number_float=np.float64, default_number_int=np.int16)
assert isinstance(config.data_type, defaultdict)
assert config.data_type.default_factory() == BasicType('float32')
assert config.default_number_float == BasicType('float64')
assert config.default_number_int == BasicType('int16')
config = CreateKernelConfig(data_type='float64')
assert isinstance(config.data_type, defaultdict)
assert config.data_type.default_factory() == BasicType('float64')
assert config.default_number_float == BasicType('float64')
assert config.default_number_int == BasicType('int64')
config = CreateKernelConfig(data_type={'a': np.float64, 'b': np.float32})
assert isinstance(config.data_type, defaultdict)
assert config.data_type.default_factory() == BasicType('float64')
assert config.default_number_float == BasicType('float64')
assert config.default_number_int == BasicType('int64')
config = CreateKernelConfig(data_type={'a': np.float32, 'b': np.int32})
assert isinstance(config.data_type, defaultdict)
assert config.data_type.default_factory() == BasicType('float32')
assert config.default_number_float == BasicType('float32')
assert config.default_number_int == BasicType('int64')
def test_config_target_as_string():
with pytest.raises(ValueError):
CreateKernelConfig(target='cpu')
def test_config_backend_as_string():
with pytest.raises(ValueError):
CreateKernelConfig(backend='C')
def test_config_python_types():
with pytest.raises(ValueError):
CreateKernelConfig(data_type=float)
def test_config_python_types2():
with pytest.raises(ValueError):
CreateKernelConfig(data_type={'a': float})
def test_config_python_types3():
with pytest.raises(ValueError):
CreateKernelConfig(default_number_float=float)
def test_config_python_types4():
with pytest.raises(ValueError):
CreateKernelConfig(default_number_int=int)
def test_config_python_types5():
with pytest.raises(ValueError):
CreateKernelConfig(data_type="float")
def test_config_python_types6():
with pytest.raises(ValueError):
CreateKernelConfig(default_number_float="float")
def test_config_python_types7():
dtype = defaultdict(lambda: 'float', {'a': np.float64, 'b': np.int64})
with pytest.raises(ValueError):
CreateKernelConfig(data_type=dtype)
def test_config_python_types8():
dtype = defaultdict(lambda: float, {'a': np.float64, 'b': np.int64})
with pytest.raises(ValueError):
CreateKernelConfig(data_type=dtype)
def test_config_python_types9():
dtype = defaultdict(lambda: 'float32', {'a': 'float', 'b': np.int64})
with pytest.raises(ValueError):
CreateKernelConfig(data_type=dtype)
def test_config_python_types10():
dtype = defaultdict(lambda: 'float32', {'a': float, 'b': np.int64})
with pytest.raises(ValueError):
CreateKernelConfig(data_type=dtype)