Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
# -*- coding: utf-8 -*-
#
# Copyright © 2019 Stephan Seitz <stephan.seitz@fau.de>
#
# Distributed under terms of the GPLv3 license.
"""
"""
import pytest
import numpy as np
import pystencils.config
import sympy as sp
import sympy.abc
import pystencils as ps
from pystencils.typing import create_type
@pytest.mark.parametrize('dtype', ["float64", "float32"])
def test_sum(dtype):
sum = sp.Sum(sp.abc.k, (sp.abc.k, 1, 100))
expanded_sum = sum.doit()
# print(sum)
# print(expanded_sum)
x = ps.fields(f'x: {dtype}[1d]')
assignments = ps.AssignmentCollection({x.center(): sum})
ast = ps.create_kernel(assignments)
code = ps.get_code_str(ast)
kernel = ast.compile()
# ps.show_code(ast)
if dtype == "float32":
assert "5050.0f;" in code
array = np.zeros((10,), dtype=dtype)
kernel(x=array)
assert np.allclose(array, int(expanded_sum) * np.ones_like(array))
@pytest.mark.parametrize('dtype', ["int32", "int64", "float64", "float32"])
def test_product(dtype):
k = ps.TypedSymbol('k', create_type(dtype))
sum = sympy.Product(k, (k, 1, 10))
expanded_sum = sum.doit()
# print(sum)
# print(expanded_sum)
x = ps.fields(f'x: {dtype}[1d]')
assignments = ps.AssignmentCollection({x.center(): sum})
config = pystencils.config.CreateKernelConfig()
ast = ps.create_kernel(assignments, config=config)
code = ps.get_code_str(ast)
kernel = ast.compile()
# print(code)
if dtype == "int64" or dtype == "int32":
assert '3628800;' in code
elif dtype == "float32":
assert '3628800.0f;' in code
else:
assert '3628800.0;' in code
array = np.zeros((10,), dtype=dtype)
kernel(x=array)
assert np.allclose(array, int(expanded_sum) * np.ones_like(array))
# TODO: See Issue !55
# def test_prod_var_limit():
#
# k = ps.TypedSymbol('k', create_type('int64'))
# limit = ps.TypedSymbol('limit', create_type('int64'))
#
# sum = sympy.Sum(k, (k, 1, limit))
# expanded_sum = sum.replace(limit, 100).doit()
#
# print(sum)
# print(expanded_sum)
#
# x = ps.fields('x: int64[1d]')
#
# assignments = ps.AssignmentCollection({x.center(): sum})
#
# ast = ps.create_kernel(assignments)
# ps.show_code(ast)
# kernel = ast.compile()
#
# array = np.zeros((10,), np.int64)
#
# kernel(x=array, limit=100)
#
# assert np.allclose(array, int(expanded_sum) * np.ones_like(array))
import sympy
import numpy as np
import sympy as sp
import pystencils
from pystencils.sympyextensions import replace_second_order_products
from pystencils.sympyextensions import remove_higher_order_terms
from pystencils.sympyextensions import complete_the_squares_in_exp
from pystencils.sympyextensions import extract_most_common_factor
from pystencils.sympyextensions import simplify_by_equality
from pystencils.sympyextensions import count_operations
from pystencils.sympyextensions import common_denominator
from pystencils.sympyextensions import get_symmetric_part
from pystencils.sympyextensions import scalar_product
from pystencils.sympyextensions import kronecker_delta
from pystencils import Assignment
from pystencils.functions import DivFunc
from pystencils.fast_approximation import (fast_division, fast_inv_sqrt, fast_sqrt,
insert_fast_divisions, insert_fast_sqrts)
def test_utility():
a = [1, 2]
b = (2, 3)
a_np = np.array(a)
b_np = np.array(b)
assert scalar_product(a, b) == np.dot(a_np, b_np)
a = sympy.Symbol("a")
b = sympy.Symbol("b")
assert kronecker_delta(a, a, a, b) == 0
assert kronecker_delta(a, a, a, a) == 1
assert kronecker_delta(3, 3, 3, 2) == 0
assert kronecker_delta(2, 2, 2, 2) == 1
assert kronecker_delta([10] * 100) == 1
assert kronecker_delta((0, 1), (0, 1)) == 1
def test_replace_second_order_products():
x, y = sympy.symbols('x y')
expr = 4 * x * y
expected_expr_positive = 2 * ((x + y) ** 2 - x ** 2 - y ** 2)
expected_expr_negative = 2 * (-(x - y) ** 2 + x ** 2 + y ** 2)
result = replace_second_order_products(expr, search_symbols=[x, y], positive=True)
assert result == expected_expr_positive
assert (result - expected_expr_positive).simplify() == 0
result = replace_second_order_products(expr, search_symbols=[x, y], positive=False)
assert result == expected_expr_negative
assert (result - expected_expr_negative).simplify() == 0
result = replace_second_order_products(expr, search_symbols=[x, y], positive=None)
assert result == expected_expr_positive
a = [Assignment(sympy.symbols('z'), x + y)]
replace_second_order_products(expr, search_symbols=[x, y], positive=True, replace_mixed=a)
assert len(a) == 2
assert replace_second_order_products(4 + y, search_symbols=[x, y]) == y + 4
def test_remove_higher_order_terms():
x, y = sympy.symbols('x y')
expr = sympy.Mul(x, y)
result = remove_higher_order_terms(expr, order=1, symbols=[x, y])
assert result == 0
result = remove_higher_order_terms(expr, order=2, symbols=[x, y])
assert result == expr
expr = sympy.Pow(x, 3)
result = remove_higher_order_terms(expr, order=2, symbols=[x, y])
assert result == 0
result = remove_higher_order_terms(expr, order=3, symbols=[x, y])
assert result == expr
def test_complete_the_squares_in_exp():
a, b, c, s, n = sympy.symbols('a b c s n')
expr = a * s ** 2 + b * s + c
result = complete_the_squares_in_exp(expr, symbols_to_complete=[s])
assert result == expr
expr = sympy.exp(a * s ** 2 + b * s + c)
expected_result = sympy.exp(a*s**2 + c - b**2 / (4*a))
result = complete_the_squares_in_exp(expr, symbols_to_complete=[s])
assert result == expected_result
def test_extract_most_common_factor():
x, y = sympy.symbols('x y')
expr = 1 / (x + y) + 3 / (x + y) + 3 / (x + y)
most_common_factor = extract_most_common_factor(expr)
assert most_common_factor[0] == 7
assert sympy.prod(most_common_factor) == expr
expr = 1 / x + 3 / (x + y) + 3 / y
most_common_factor = extract_most_common_factor(expr)
assert most_common_factor[0] == 3
assert sympy.prod(most_common_factor) == expr
expr = 1 / x
most_common_factor = extract_most_common_factor(expr)
assert most_common_factor[0] == 1
assert sympy.prod(most_common_factor) == expr
assert most_common_factor[1] == expr
def test_count_operations():
x, y, z = sympy.symbols('x y z')
expr = 1/x + y * sympy.sqrt(z)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
assert ops['muls'] == 1
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = 1 / sympy.sqrt(z)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 0
assert ops['muls'] == 0
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = sympy.Rel(1 / sympy.sqrt(z), 5)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 0
assert ops['muls'] == 0
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = sympy.sqrt(x + y)
expr = insert_fast_sqrts(expr).atoms(fast_sqrt)
ops = count_operations(*expr, only_type=None)
assert ops['fast_sqrts'] == 1
expr = sympy.sqrt(x / y)
expr = insert_fast_divisions(expr).atoms(fast_division)
ops = count_operations(*expr, only_type=None)
assert ops['fast_div'] == 1
expr = pystencils.Assignment(sympy.Symbol('tmp'), 3 / sympy.sqrt(x + y))
expr = insert_fast_sqrts(expr).atoms(fast_inv_sqrt)
ops = count_operations(*expr, only_type=None)
assert ops['fast_inv_sqrts'] == 1
expr = sympy.Piecewise((1.0, x > 0), (0.0, True)) + y * z
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
expr = sympy.Pow(1/x + y * sympy.sqrt(z), 100)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
assert ops['muls'] == 99
assert ops['divs'] == 1
assert ops['sqrts'] == 1
expr = DivFunc(x, y)
ops = count_operations(expr, only_type=None)
assert ops['divs'] == 1
expr = DivFunc(x + z, y + z)
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 2
assert ops['divs'] == 1
expr = sp.UnevaluatedExpr(sp.Mul(*[x]*100, evaluate=False))
ops = count_operations(expr, only_type=None)
assert ops['muls'] == 99
expr = DivFunc(1, sp.UnevaluatedExpr(sp.Mul(*[x]*100, evaluate=False)))
ops = count_operations(expr, only_type=None)
assert ops['divs'] == 1
assert ops['muls'] == 99
expr = DivFunc(y + z, sp.UnevaluatedExpr(sp.Mul(*[x]*100, evaluate=False)))
ops = count_operations(expr, only_type=None)
assert ops['adds'] == 1
assert ops['divs'] == 1
assert ops['muls'] == 99
def test_common_denominator():
x = sympy.symbols('x')
expr = sympy.Rational(1, 2) + x * sympy.Rational(2, 3)
cm = common_denominator(expr)
assert cm == 6
def test_get_symmetric_part():
x, y, z = sympy.symbols('x y z')
expr = x / 9 - y ** 2 / 6 + z ** 2 / 3 + z / 3
expected_result = x / 9 - y ** 2 / 6 + z ** 2 / 3
sym_part = get_symmetric_part(expr, sympy.symbols(f'y z'))
assert sym_part == expected_result
def test_simplify_by_equality():
x, y, z = sp.symbols('x, y, z')
p, q = sp.symbols('p, q')
# Let x = y + z
expr = x * p - y * p + z * q
expr = simplify_by_equality(expr, x, y, z)
assert expr == z * p + z * q
expr = x * (p - 2 * q) + 2 * q * z
expr = simplify_by_equality(expr, x, y, z)
assert expr == x * p - 2 * q * y
expr = x * (y + z) - y * z
expr = simplify_by_equality(expr, x, y, z)
assert expr == x*y + z**2
# Let x = y + 2
expr = x * p - 2 * p
expr = simplify_by_equality(expr, x, y, 2)
assert expr == y * p
import time
import numpy as np
from pystencils import Assignment
from pystencils import create_kernel
from pystencils.datahandling import create_data_handling
from pystencils.timeloop import TimeLoop
def test_timeloop():
dh = create_data_handling(domain_size=(2, 2), periodicity=True)
pre = dh.add_array('pre_run_field', values_per_cell=1)
dh.fill("pre_run_field", 0.0, ghost_layers=True)
f = dh.add_array('field', values_per_cell=1)
dh.fill("field", 0.0, ghost_layers=True)
post = dh.add_array('post_run_field', values_per_cell=1)
dh.fill("post_run_field", 0.0, ghost_layers=True)
single_step = dh.add_array('single_step_field', values_per_cell=1)
dh.fill("single_step_field", 0.0, ghost_layers=True)
pre_assignments = Assignment(pre.center, pre.center + 1)
pre_kernel = create_kernel(pre_assignments).compile()
assignments = Assignment(f.center, f.center + 1)
kernel = create_kernel(assignments).compile()
post_assignments = Assignment(post.center, post.center + 1)
post_kernel = create_kernel(post_assignments).compile()
single_step_assignments = Assignment(single_step.center, single_step.center + 1)
single_step_kernel = create_kernel(single_step_assignments).compile()
fixed_steps = 2
timeloop = TimeLoop(steps=fixed_steps)
assert timeloop.fixed_steps == fixed_steps
def pre_run():
dh.run_kernel(pre_kernel)
def post_run():
dh.run_kernel(post_kernel)
def single_step_run():
dh.run_kernel(single_step_kernel)
timeloop.add_pre_run_function(pre_run)
timeloop.add_post_run_function(post_run)
timeloop.add_single_step_function(single_step_run)
timeloop.add_call(kernel, {'field': dh.cpu_arrays["field"]})
# the timeloop is initialised with 2 steps. This means a single time step consists of two steps.
# Therefore, we have 2 main iterations and one single step iteration in this configuration
timeloop.run(time_steps=5)
assert np.all(dh.cpu_arrays["pre_run_field"] == 1.0)
assert np.all(dh.cpu_arrays["field"] == 2.0)
assert np.all(dh.cpu_arrays["single_step_field"] == 1.0)
assert np.all(dh.cpu_arrays["post_run_field"] == 1.0)
seconds = 2
start = time.perf_counter()
timeloop.run_time_span(seconds=seconds)
end = time.perf_counter()
# This test case fails often due to time measurements. It is not a good idea to assert here
# np.testing.assert_almost_equal(seconds, end - start, decimal=2)
print("timeloop: ", seconds, " own meassurement: ", end - start)
import sympy as sp
import numpy as np
import pystencils as ps
from pystencils import fields, TypedSymbol
from pystencils.astnodes import LoopOverCoordinate, SympyAssignment
from pystencils.typing import create_type
from pystencils.transformations import (
filtered_tree_iteration, get_loop_hierarchy, get_loop_counter_symbol_hierarchy,
iterate_loops_by_depth, split_inner_loop, loop_blocking
)
from pystencils.cpu import add_pragmas
def test_loop_information():
f, g = ps.fields("f, g: double[2D]")
update_rule = ps.Assignment(g[0, 0], f[0, 0])
ast = ps.create_kernel(update_rule)
inner_loops = [loop for loop in filtered_tree_iteration(ast, LoopOverCoordinate, stop_type=SympyAssignment)
if loop.is_innermost_loop]
loop_order = []
for i in get_loop_hierarchy(inner_loops[0].args[0]):
loop_order.append(i)
assert loop_order == [0, 1]
loop_symbols = get_loop_counter_symbol_hierarchy(inner_loops[0].args[0])
assert loop_symbols == [TypedSymbol("ctr_1", create_type("int"), nonnegative=True),
TypedSymbol("ctr_0", create_type("int"), nonnegative=True)]
def test_iterate_loops_by_depth():
f, g = ps.fields("f, g: double[3D]", layout="fzyx")
x = ps.TypedSymbol('x', np.float64)
subs = [ps.Assignment(x, f[0, 0, 0])]
mains = [ps.Assignment(g[0, 0, 0], x)]
ac = ps.AssignmentCollection(mains, subexpressions=subs)
config = ps.CreateKernelConfig(cpu_blocking=(0, 16, 0))
ast = ps.create_kernel(ac, config=config)
split_inner_loop(ast, [[x], [g[0,0,0]]])
loops = list(iterate_loops_by_depth(ast, 0))
assert len(loops) == 1
assert loops[0].loop_counter_symbol.name == "_blockctr_1"
loops = list(iterate_loops_by_depth(ast, 1))
assert len(loops) == 1
assert loops[0].loop_counter_symbol.name == "ctr_2"
loops = list(iterate_loops_by_depth(ast, 2))
assert len(loops) == 1
assert loops[0].loop_counter_symbol.name == "ctr_1"
loops = list(iterate_loops_by_depth(ast, 3))
assert len(loops) == 2
assert loops[0].loop_counter_symbol.name == "ctr_0"
assert loops[1].loop_counter_symbol.name == "ctr_0"
innermost = list(iterate_loops_by_depth(ast, -1))
assert loops == innermost
def test_split_optimisation():
src, dst = fields(f"src(9), dst(9): [2D]", layout='fzyx')
stencil = ((0, 0), (0, 1), (0, -1), (-1, 0), (1, 0), (-1, 1), (1, 1), (-1, -1), (1, -1))
w = [sp.Rational(4, 9)]
w += [sp.Rational(1, 9)] * 4
w += [sp.Rational(1, 36)] * 4
cs = sp.Rational(1, 3)
subexpressions = []
main_assignements = []
rho = sp.symbols("rho")
velo = sp.symbols("u_:2")
density = 0
velocity_x = 0
velocity_y = 0
for d in stencil:
density += src[d]
velocity_x += d[0] * src[d]
velocity_y += d[1] * src[d]
subexpressions.append(ps.Assignment(rho, density))
subexpressions.append(ps.Assignment(velo[0], velocity_x))
subexpressions.append(ps.Assignment(velo[1], velocity_y))
for i, d in enumerate(stencil):
u_d = velo[0] * d[0] + velo[1] * d[1]
u_2 = velo[0] * velo[0] + velo[1] * velo[1]
expr = w[i] * rho * (1 + u_d / cs + u_d ** 2 / (2 * cs ** 2) - u_2 / (2 * cs))
main_assignements.append(ps.Assignment(dst.center_vector[i], expr))
ac = ps.AssignmentCollection(main_assignments=main_assignements, subexpressions=subexpressions)
simplification_hint = {'density': rho,
'velocity': (velo[0], velo[1]),
'split_groups': [[rho, velo[0], velo[1], dst.center_vector[0]],
[dst.center_vector[1], dst.center_vector[2]],
[dst.center_vector[3], dst.center_vector[4]],
[dst.center_vector[5], dst.center_vector[6]],
[dst.center_vector[7], dst.center_vector[8]]]}
ac.simplification_hints = simplification_hint
ast = ps.create_kernel(ac)
code = ps.get_code_str(ast)
# after the split optimisation the two for loops are split into 6
assert code.count("for") == 6
print(code)
def test_pragmas():
f, g = ps.fields("f, g: double[3D]", layout="fzyx")
x = ps.TypedSymbol('x', np.float64)
subs = [ps.Assignment(x, f[0, 0, 0])]
mains = [ps.Assignment(g[0, 0, 0], x)]
ac = ps.AssignmentCollection(mains, subexpressions=subs)
def prepend_omp_pragmas(ast):
add_pragmas(ast, ["#pragma omp for schedule(dynamic)"], nesting_depth=0)
add_pragmas(ast, ["#pragma omp simd simdlen(8)"], nesting_depth=-1)
ast_passes = [prepend_omp_pragmas]
config = ps.CreateKernelConfig(target=ps.Target.CPU, cpu_prepend_optimizations=ast_passes)
ast = ps.create_kernel(ac, config=config)
code = ps.get_code_str(ast)
assert code.find("#pragma omp for schedule(dynamic)") != -1
assert code.find("#pragma omp simd simdlen(8)") != -1
loops = [loop for loop in filtered_tree_iteration(ast, LoopOverCoordinate, stop_type=SympyAssignment)]
innermost = list(filter(lambda n: n.is_innermost_loop, loops))
assert innermost[0].prefix_lines == ["#pragma omp simd simdlen(8)"]
outermost = list(filter(lambda n: n.is_outermost_loop, loops))
assert outermost[0].prefix_lines == ["#pragma omp for schedule(dynamic)"]
from sympy.abc import a, b, c, d, e, f, g
import pystencils
from pystencils.typing import CastFunc, create_type
def test_type_interference():
x = pystencils.fields('x: float32[3d]')
assignments = pystencils.AssignmentCollection({
a: CastFunc(10, create_type('float64')),
b: CastFunc(10, create_type('uint16')),
e: 11,
c: b,
f: c + b,
d: c + b + x.center + e,
x.center: c + b + x.center,
g: a + b + d
})
ast = pystencils.create_kernel(assignments)
code = pystencils.get_code_str(ast)
# print(code)
assert 'const double a' in code
assert 'const uint16_t b' in code
assert 'const uint16_t f' in code
assert 'const int64_t e' in code
assert 'const float d = ((float)(b)) + ((float)(c)) + ((float)(e)) + ' \
'_data_x[_stride_x_0*ctr_0 + _stride_x_1*ctr_1 + _stride_x_2*ctr_2];' in code
assert '_data_x[_stride_x_0*ctr_0 + _stride_x_1*ctr_1 + _stride_x_2*ctr_2] = (' \
'(float)(b)) + ((float)(c)) + _data_x[_stride_x_0*ctr_0 + _stride_x_1*ctr_1 + _stride_x_2*ctr_2];' in code
assert 'const double g = a + ((double)(b)) + ((double)(d));' in code
import pytest
import pystencils.config
import sympy as sp
import numpy as np
import pystencils as ps
from pystencils.typing import TypedSymbol, get_type_of_expression, VectorType, collate_types, \
typed_symbols, CastFunc, PointerArithmeticFunc, PointerType, result_type, BasicType
def test_result_type():
i = np.dtype('int32')
l = np.dtype('int64')
ui = np.dtype('uint32')
ul = np.dtype('uint64')
f = np.dtype('float32')
d = np.dtype('float64')
b = np.dtype('bool')
assert result_type(i, l) == l
assert result_type(l, i) == l
assert result_type(ui, i) == i
assert result_type(ui, l) == l
assert result_type(ul, i) == i
assert result_type(ul, l) == l
assert result_type(d, f) == d
assert result_type(f, d) == d
assert result_type(i, f) == f
assert result_type(l, f) == f
assert result_type(ui, f) == f
assert result_type(ul, f) == f
assert result_type(i, d) == d
assert result_type(l, d) == d
assert result_type(ui, d) == d
assert result_type(ul, d) == d
assert result_type(b, i) == i
assert result_type(b, l) == l
assert result_type(b, ui) == ui
assert result_type(b, ul) == ul
assert result_type(b, f) == f
assert result_type(b, d) == d
@pytest.mark.parametrize('dtype', ('float64', 'float32', 'int64', 'int32', 'uint32', 'uint64'))
def test_simple_add(dtype):
constant = 1.0
if dtype[0] in 'ui':
constant = 1
f = ps.fields(f"f: {dtype}[1D]")
d = TypedSymbol("d", dtype)
test_arr = np.array([constant], dtype=dtype)
ur = ps.Assignment(f[0], f[0] + d)
ast = ps.create_kernel(ur)
code = ps.get_code_str(ast)
kernel = ast.compile()
kernel(f=test_arr, d=constant)
assert test_arr[0] == constant+constant
@pytest.mark.parametrize('dtype1', ('float64', 'float32', 'int64', 'int32', 'uint32', 'uint64'))
@pytest.mark.parametrize('dtype2', ('float64', 'float32', 'int64', 'int32', 'uint32', 'uint64'))
def test_mixed_add(dtype1, dtype2):
constant = 1
f = ps.fields(f"f: {dtype1}[1D]")
g = ps.fields(f"g: {dtype2}[1D]")
test_f = np.array([constant], dtype=dtype1)
test_g = np.array([constant], dtype=dtype2)
ur = ps.Assignment(f[0], f[0] + g[0])
# TODO Markus: check for the logging if colate_types(dtype1, dtype2) != dtype1
ast = ps.create_kernel(ur)
code = ps.get_code_str(ast)
kernel = ast.compile()
kernel(f=test_f, g=test_g)
assert test_f[0] == constant+constant
def test_collation():
double_type = BasicType('float64')
float_type = BasicType('float32')
double4_type = VectorType(double_type, 4)
float4_type = VectorType(float_type, 4)
assert collate_types([double_type, float_type]) == double_type
assert collate_types([double4_type, float_type]) == double4_type
assert collate_types([double4_type, float4_type]) == double4_type
def test_vector_type():
double_type = BasicType('float64')
float_type = BasicType('float32')
double4_type = VectorType(double_type, 4)
float4_type = VectorType(float_type, 4)
assert double4_type.item_size == 4
assert float4_type.item_size == 4
double4_type2 = VectorType(double_type, 4)
assert double4_type == double4_type2
assert double4_type != 4
assert double4_type != float4_type
def test_pointer_type():
double_type = BasicType('float64')
float_type = BasicType('float32')
double4_type = PointerType(double_type, restrict=True)
float4_type = PointerType(float_type, restrict=False)
assert double4_type.item_size == 1
assert float4_type.item_size == 1
assert not double4_type == 4
assert not double4_type.alias
assert float4_type.alias
def test_dtype_of_constants():
# Some come constants are neither of type Integer,Float,Rational and don't have args
# >>> isinstance(pi, Integer)
# False
# >>> isinstance(pi, Float)
# False
# >>> isinstance(pi, Rational)
# False
# >>> pi.args
# ()
get_type_of_expression(sp.pi)
def test_assumptions():
x = ps.fields('x: float32[3d]')
assert x.shape[0].is_nonnegative
assert (2 * x.shape[0]).is_nonnegative
assert (2 * x.shape[0]).is_integer
assert (TypedSymbol('a', BasicType('uint64'))).is_nonnegative
assert (TypedSymbol('a', BasicType('uint64'))).is_positive is None
assert (TypedSymbol('a', BasicType('uint64')) + 1).is_positive
assert (x.shape[0] + 1).is_real
@pytest.mark.parametrize('dtype', ('float64', 'float32'))
def test_sqrt_of_integer(dtype):
"""Regression test for bug where sqrt(3) was classified as integer"""
f = ps.fields(f'f: {dtype}[1D]')
tmp = sp.symbols('tmp')
assignments = [ps.Assignment(tmp, sp.sqrt(3)),
ps.Assignment(f[0], tmp)]
arr = np.array([1], dtype=dtype)
config = pystencils.config.CreateKernelConfig(data_type=dtype, default_number_float=dtype)
ast = ps.create_kernel(assignments, config=config)
kernel = ast.compile()
kernel(f=arr)
assert 1.7 < arr[0] < 1.8
code = ps.get_code_str(ast)
constant = '1.7320508075688772f'
if dtype == 'float32':
assert constant in code
else:
assert constant not in code
@pytest.mark.parametrize('dtype', ('float64', 'float32'))
def test_integer_comparision(dtype):
f = ps.fields(f"f: {dtype}[2D]")
d = TypedSymbol("dir", "int64")
ur = ps.Assignment(f[0, 0], sp.Piecewise((0, sp.Equality(d, 1)), (f[0, 0], True)))
ast = ps.create_kernel(ur)
code = ps.get_code_str(ast)
# There should be an explicit cast for the integer zero to the type of the field on the rhs
if dtype == 'float64':
t = "_data_f[_stride_f_0*ctr_0 + _stride_f_1*ctr_1] = " \
"((((dir) == (1))) ? (0.0): (_data_f[_stride_f_0*ctr_0 + _stride_f_1*ctr_1]));"
else:
t = "_data_f[_stride_f_0*ctr_0 + _stride_f_1*ctr_1] = " \
"((((dir) == (1))) ? (0.0f): (_data_f[_stride_f_0*ctr_0 + _stride_f_1*ctr_1]));"
assert t in code
def test_typed_symbols_dtype():
assert typed_symbols(("s", "f"), np.uint) == typed_symbols("s, f", np.uint)
t_symbols = typed_symbols(("s", "f"), np.uint)
s = t_symbols[0]
assert t_symbols[0] == TypedSymbol("s", np.uint)
assert s.dtype.is_uint()
assert typed_symbols("s", np.float64).dtype.c_name == 'double'
assert typed_symbols("s", np.float32).dtype.c_name == 'float'
assert TypedSymbol("s", np.uint).canonical == TypedSymbol("s", np.uint)
assert TypedSymbol("s", np.uint).reversed == TypedSymbol("s", np.uint)
def test_cast_func():
assert CastFunc(TypedSymbol("s", np.uint), np.int64).canonical == TypedSymbol("s", np.uint).canonical
a = CastFunc(5, np.uint)
assert a.is_negative is False
assert a.is_nonnegative
def test_pointer_arithmetic_func():
assert PointerArithmeticFunc(TypedSymbol("s", np.uint), 1).canonical == TypedSymbol("s", np.uint).canonical
def test_division():
f = ps.fields('f(10): float32[2D]')
m, tau = sp.symbols("m, tau")
up = [ps.Assignment(tau, 1 / (0.5 + (3.0 * m))),
ps.Assignment(f.center, tau)]
config = pystencils.config.CreateKernelConfig(data_type='float32', default_number_float='float32')
ast = ps.create_kernel(up, config=config)
code = ps.get_code_str(ast)
assert "((1.0f) / (m*3.0f + 0.5f))" in code
def test_pow():
f = ps.fields('f(10): float32[2D]')
m, tau = sp.symbols("m, tau")
up = [ps.Assignment(tau, m ** 1.5),
ps.Assignment(f.center, tau)]
config = pystencils.config.CreateKernelConfig(data_type="float32", default_number_float='float32')
ast = ps.create_kernel(up, config=config)
code = ps.get_code_str(ast)
assert "1.5f" in code
import pytest import pytest
import sympy as sp import sympy as sp
from pystencils.utils import LinearEquationSystem from pystencils.utils import LinearEquationSystem
from pystencils.utils import DotDict
def test_linear_equation_system(): def test_linear_equation_system():
...@@ -31,4 +31,51 @@ def test_linear_equation_system(): ...@@ -31,4 +31,51 @@ def test_linear_equation_system():
with pytest.raises(ValueError) as e: with pytest.raises(ValueError) as e:
m.add_equation(x**2 - 1) m.add_equation(x**2 - 1)
assert 'Not a linear equation' in str(e) assert 'Not a linear equation' in str(e.value)
x, y, z = sp.symbols("x, y, z")
les = LinearEquationSystem([x, y, z])
les.add_equation(1 * x + 2 * y - 1 * z + 4)
les.add_equation(2 * x + 1 * y + 1 * z - 2)
les.add_equation(1 * x + 2 * y + 1 * z + 2)
# usually reduce is not necessary since almost every function of LinearEquationSystem calls reduce beforehand
les.reduce()
expected_matrix = sp.Matrix([[1, 0, 0, sp.Rational(5, 3)],
[0, 1, 0, sp.Rational(-7, 3)],
[0, 0, 1, sp.Integer(1)]])
assert les.matrix == expected_matrix
assert les.rank == 3
sol = les.solution()
assert sol[x] == sp.Rational(5, 3)
assert sol[y] == sp.Rational(-7, 3)
assert sol[z] == sp.Integer(1)
les = LinearEquationSystem([x, y])
assert les.solution_structure() == 'multiple'
les.add_equation(x + 1)
assert les.solution_structure() == 'multiple'
les.add_equation(y + 2)
assert les.solution_structure() == 'single'
les.add_equation(x + y + 5)
assert les.solution_structure() == 'none'
def test_dot_dict():
d = {'a': {'c': 7}, 'b': 6}
t = DotDict(d)
assert t.a.c == 7
assert t.b == 6
assert len(t) == 2
delattr(t, 'b')
assert len(t) == 1
t.b = 6
assert len(t) == 2
assert t.b == 6
import numpy as np
import pytest
import pystencils.config
import sympy as sp
import pystencils as ps
import pystencils.astnodes as ast
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets, get_vector_instruction_set
from pystencils.cpu.vectorization import vectorize
from pystencils.enums import Target
from pystencils.transformations import replace_inner_stride_with_one
supported_instruction_sets = get_supported_instruction_sets()
if supported_instruction_sets:
instruction_set = supported_instruction_sets[-1]
else:
instruction_set = None
# TODO: Skip tests if no instruction set is available and check all codes if they are really vectorised !
def test_vector_type_propagation1(instruction_set=instruction_set):
a, b, c, d, e = sp.symbols("a b c d e")
arr = np.ones((2 ** 2 + 2, 2 ** 3 + 2))
arr *= 10.0
f, g = ps.fields(f=arr, g=arr)
update_rule = [ps.Assignment(a, f[1, 0]),
ps.Assignment(b, a),
ps.Assignment(g[0, 0], b + 3 + f[0, 1])]
ast = ps.create_kernel(update_rule)
vectorize(ast, instruction_set=instruction_set)
# ps.show_code(ast)
func = ast.compile()
dst = np.zeros_like(arr)
func(g=dst, f=arr)
np.testing.assert_equal(dst[1:-1, 1:-1], 2 * 10.0 + 3)
def test_vector_type_propagation2(instruction_set=instruction_set):
data_type = 'float32'
assume_aligned = True
assume_inner_stride_one = True
assume_sufficient_line_padding = True
domain_size = (24, 24)
dh = ps.create_data_handling(domain_size, default_target=Target.CPU)
# fields
g = dh.add_array("g", values_per_cell=1, dtype=data_type, alignment=True, ghost_layers=0)
f = dh.add_array("f", values_per_cell=1, dtype=data_type, alignment=True, ghost_layers=0)
dh.fill("g", 1.0, ghost_layers=True)
dh.fill("f", 0.5, ghost_layers=True)
collision = [
ps.Assignment(sp.Symbol("b"), sp.sqrt(sp.Symbol("a") * (1 - (1 - f.center)**2))),
ps.Assignment(g.center, sp.Symbol("b"))
]
config = ps.CreateKernelConfig(
target=ps.Target.CPU,
data_type=data_type,
default_number_float=data_type,
cpu_vectorize_info={
'assume_aligned': assume_aligned,
'assume_inner_stride_one': assume_inner_stride_one,
'assume_sufficient_line_padding': assume_sufficient_line_padding,
},
ghost_layers=0
)
ast = ps.create_kernel(collision, config=config)
print(ast)
code = ps.get_code_str(ast)
print(code)
kernel = ast.compile()
dh.run_kernel(kernel, a=4)
g_arr = dh.cpu_arrays['g']
np.testing.assert_allclose(g_arr, np.full_like(g_arr, np.sqrt(3)))
def test_vectorize_moved_constants1(instruction_set=instruction_set):
opt = {'instruction_set': instruction_set, 'assume_inner_stride_one': True}
f = ps.fields("f: [1D]")
x = ast.TypedSymbol("x", np.float64)
kernel_func = ps.create_kernel(
[ast.SympyAssignment(x, 2.0), ast.SympyAssignment(f[0], x)],
cpu_prepend_optimizations=[ps.transformations.move_constants_before_loop], # explicitly move constants
cpu_vectorize_info=opt,
)
ps.show_code(kernel_func) # fails if `x` on rhs was not correctly vectorized
kernel = kernel_func.compile()
f_arr = np.zeros(9)
kernel(f=f_arr)
assert(np.all(f_arr == 2))
def test_vectorize_moved_constants2(instruction_set=instruction_set):
opt = {'instruction_set': instruction_set, 'assume_inner_stride_one': True}
f = ps.fields("f: [1D]")
x = ast.TypedSymbol("x", np.float64)
y = ast.TypedSymbol("y", np.float64)
kernel_func = ps.create_kernel(
[ast.SympyAssignment(x, 2.0), ast.SympyAssignment(y, 3.0), ast.SympyAssignment(f[0], x + y)],
cpu_prepend_optimizations=[ps.transformations.move_constants_before_loop], # explicitly move constants
cpu_vectorize_info=opt,
)
ps.show_code(kernel_func) # fails if `x` on rhs was not correctly vectorized
kernel = kernel_func.compile()
f_arr = np.zeros(9)
kernel(f=f_arr)
assert(np.all(f_arr == 5))
@pytest.mark.parametrize('openmp', [True, False])
def test_aligned_and_nt_stores(openmp, instruction_set=instruction_set):
domain_size = (24, 24)
# create a datahandling object
dh = ps.create_data_handling(domain_size, periodicity=(True, True), parallel=False, default_target=Target.CPU)
# fields
alignment = 'cacheline' if openmp else True
g = dh.add_array("g", values_per_cell=1, alignment=alignment)
dh.fill("g", 1.0, ghost_layers=True)
f = dh.add_array("f", values_per_cell=1, alignment=alignment)
dh.fill("f", 0.0, ghost_layers=True)
opt = {'instruction_set': instruction_set, 'assume_aligned': True, 'nontemporal': True,
'assume_inner_stride_one': True}
update_rule = [ps.Assignment(f.center(), 0.25 * (g[-1, 0] + g[1, 0] + g[0, -1] + g[0, 1]))]
# Without the base pointer spec, the inner store is not aligned
config = pystencils.config.CreateKernelConfig(target=dh.default_target, cpu_vectorize_info=opt, cpu_openmp=openmp)
ast = ps.create_kernel(update_rule, config=config)
if instruction_set in ['sse'] or instruction_set.startswith('avx') or instruction_set.startswith('sve'):
assert 'stream' in ast.instruction_set
assert 'streamFence' in ast.instruction_set
if instruction_set in ['neon', 'vsx', 'rvv']:
assert 'cachelineZero' in ast.instruction_set
if instruction_set in ['vsx']:
assert 'storeAAndFlushCacheline' in ast.instruction_set
for instruction in ['stream', 'streamFence', 'cachelineZero', 'storeAAndFlushCacheline', 'flushCacheline']:
if instruction in ast.instruction_set:
assert ast.instruction_set[instruction].split('{')[0] in ps.get_code_str(ast)
kernel = ast.compile()
# ps.show_code(ast)
dh.run_kernel(kernel)
np.testing.assert_equal(np.sum(dh.cpu_arrays['f']), np.prod(domain_size))
def test_nt_stores_symbolic_size(instruction_set=instruction_set):
f, g = ps.fields('f, g: [2D]', layout='fzyx')
update_rule = [ps.Assignment(f.center(), 0.0), ps.Assignment(g.center(), 0.0)]
opt = {'instruction_set': instruction_set, 'assume_aligned': True, 'nontemporal': True,
'assume_inner_stride_one': True}
config = pystencils.config.CreateKernelConfig(target=Target.CPU, cpu_vectorize_info=opt)
ast = ps.create_kernel(update_rule, config=config)
# ps.show_code(ast)
ast.compile()
def test_inplace_update(instruction_set=instruction_set):
shape = (9, 9, 3)
arr = np.ones(shape, order='f')
@ps.kernel
def update_rule(s):
f = ps.fields("f(3) : [2D]", f=arr)
s.tmp0 @= f(0)
s.tmp1 @= f(1)
s.tmp2 @= f(2)
f0, f1, f2 = f(0), f(1), f(2)
f0 @= 2 * s.tmp0
f1 @= 2 * s.tmp0
f2 @= 2 * s.tmp0
config = pystencils.config.CreateKernelConfig(cpu_vectorize_info={'instruction_set': instruction_set})
ast = ps.create_kernel(update_rule, config=config)
kernel = ast.compile()
kernel(f=arr)
np.testing.assert_equal(arr, 2)
def test_vectorization_fixed_size(instruction_set=instruction_set):
instructions = get_vector_instruction_set(instruction_set=instruction_set)
configurations = []
# Fixed size - multiple of four
arr = np.ones((20 + 2, 24 + 2)) * 5.0
f, g = ps.fields(f=arr, g=arr)
configurations.append((arr, f, g))
# Fixed size - no multiple of four
arr = np.ones((21 + 2, 25 + 2)) * 5.0
f, g = ps.fields(f=arr, g=arr)
configurations.append((arr, f, g))
# Fixed size - different remainder
arr = np.ones((23 + 2, 17 + 2)) * 5.0
f, g = ps.fields(f=arr, g=arr)
configurations.append((arr, f, g))
for arr, f, g in configurations:
update_rule = [ps.Assignment(g[0, 0], f[0, 0] + f[-1, 0] + f[1, 0] + f[0, 1] + f[0, -1] + 42.0)]
ast = ps.create_kernel(update_rule)
vectorize(ast, instruction_set=instruction_set)
code = ps.get_code_str(ast)
add_instruction = instructions["+"][:instructions["+"].find("(")]
assert add_instruction in code
# print(code)
func = ast.compile()
dst = np.zeros_like(arr)
func(g=dst, f=arr)
np.testing.assert_equal(dst[1:-1, 1:-1], 5 * 5.0 + 42.0)
def test_vectorization_variable_size(instruction_set=instruction_set):
f, g = ps.fields("f, g : double[2D]")
update_rule = [ps.Assignment(g[0, 0], f[0, 0] + f[-1, 0] + f[1, 0] + f[0, 1] + f[0, -1] + 42.0)]
ast = ps.create_kernel(update_rule)
replace_inner_stride_with_one(ast)
vectorize(ast, instruction_set=instruction_set)
func = ast.compile()
arr = np.ones((23 + 2, 17 + 2)) * 5.0
dst = np.zeros_like(arr)
func(g=dst, f=arr)
np.testing.assert_equal(dst[1:-1, 1:-1], 5 * 5.0 + 42.0)
def test_piecewise1(instruction_set=instruction_set):
a, b, c, d, e = sp.symbols("a b c d e")
arr = np.ones((2 ** 3 + 2, 2 ** 4 + 2)) * 5.0
f, g = ps.fields(f=arr, g=arr)
update_rule = [ps.Assignment(a, f[1, 0]),
ps.Assignment(b, a),
ps.Assignment(c, f[0, 0] > 0.0),
ps.Assignment(g[0, 0], sp.Piecewise((b + 3 + f[0, 1], c), (0.0, True)))]
ast = ps.create_kernel(update_rule)
vectorize(ast, instruction_set=instruction_set)
func = ast.compile()
dst = np.zeros_like(arr)
func(g=dst, f=arr)
np.testing.assert_equal(dst[1:-1, 1:-1], 5 + 3 + 5.0)
def test_piecewise2(instruction_set=instruction_set):
arr = np.zeros((20, 20))
@ps.kernel
def test_kernel(s):
f, g = ps.fields(f=arr, g=arr)
s.condition @= f[0, 0] > 1
s.result @= 0.0 if s.condition else 1.0
g[0, 0] @= s.result
ast = ps.create_kernel(test_kernel)
# ps.show_code(ast)
vectorize(ast, instruction_set=instruction_set)
# ps.show_code(ast)
func = ast.compile()
func(f=arr, g=arr)
np.testing.assert_equal(arr, np.ones_like(arr))
def test_piecewise3(instruction_set=instruction_set):
arr = np.zeros((22, 22))
@ps.kernel
def test_kernel(s):
f, g = ps.fields(f=arr, g=arr)
s.b @= f[0, 1]
g[0, 0] @= 1.0 / (s.b + s.k) if f[0, 0] > 0.0 else 1.0
ast = ps.create_kernel(test_kernel)
# ps.show_code(ast)
vectorize(ast, instruction_set=instruction_set)
# ps.show_code(ast)
ast.compile()
def test_logical_operators(instruction_set=instruction_set):
arr = np.zeros((22, 22))
@ps.kernel
def kernel_and(s):
f, g = ps.fields(f=arr, g=arr)
s.c @= sp.And(f[0, 1] < 0.0, f[1, 0] < 0.0)
g[0, 0] @= sp.Piecewise([1.0 / f[1, 0], s.c], [1.0, True])
ast = ps.create_kernel(kernel_and)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
@ps.kernel
def kernel_or(s):
f, g = ps.fields(f=arr, g=arr)
s.c @= sp.Or(f[0, 1] < 0.0, f[1, 0] < 0.0)
g[0, 0] @= sp.Piecewise([1.0 / f[1, 0], s.c], [1.0, True])
ast = ps.create_kernel(kernel_or)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
@ps.kernel
def kernel_equal(s):
f, g = ps.fields(f=arr, g=arr)
s.c @= sp.Eq(f[0, 1], 2.0)
g[0, 0] @= sp.Piecewise([1.0 / f[1, 0], s.c], [1.0, True])
ast = ps.create_kernel(kernel_equal)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
def test_hardware_query():
assert {'sse', 'neon', 'sve', 'sve2', 'sme', 'vsx', 'rvv'}.intersection(supported_instruction_sets)
def test_vectorised_pow(instruction_set=instruction_set):
arr = np.zeros((24, 24))
f, g = ps.fields(f=arr, g=arr)
as1 = ps.Assignment(g[0, 0], sp.Pow(f[0, 0], 2))
as2 = ps.Assignment(g[0, 0], sp.Pow(f[0, 0], 0.5))
as3 = ps.Assignment(g[0, 0], sp.Pow(f[0, 0], -0.5))
as4 = ps.Assignment(g[0, 0], sp.Pow(f[0, 0], 4))
as5 = ps.Assignment(g[0, 0], sp.Pow(f[0, 0], -4))
as6 = ps.Assignment(g[0, 0], sp.Pow(f[0, 0], -1))
ast = ps.create_kernel(as1)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
ast = ps.create_kernel(as2)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
ast = ps.create_kernel(as3)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
ast = ps.create_kernel(as4)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
ast = ps.create_kernel(as5)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
ast = ps.create_kernel(as6)
vectorize(ast, instruction_set=instruction_set)
ast.compile()
def test_issue40(*_):
"""https://i10git.cs.fau.de/pycodegen/pystencils/-/issues/40"""
opt = {'instruction_set': "avx512", 'assume_aligned': False,
'nontemporal': False, 'assume_inner_stride_one': True}
src = ps.fields("src(1): double[2D]", layout='fzyx')
eq = [ps.Assignment(sp.Symbol('rho'), 1.0),
ps.Assignment(src[0, 0](0), sp.Rational(4, 9) * sp.Symbol('rho'))]
config = pystencils.config.CreateKernelConfig(target=Target.CPU, cpu_vectorize_info=opt, data_type='float64')
ast = ps.create_kernel(eq, config=config)
code = ps.get_code_str(ast)
assert 'epi32' not in code
import pytest
import numpy as np
import pystencils.config
import sympy as sp
import pystencils as ps
from pystencils.backends.simd_instruction_sets import (get_cacheline_size, get_supported_instruction_sets,
get_vector_instruction_set)
from pystencils.enums import Target
from pystencils.typing import CFunction
from . import test_vectorization
supported_instruction_sets = get_supported_instruction_sets() if get_supported_instruction_sets() else []
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
def test_vectorisation_varying_arch(instruction_set):
shape = (9, 9, 3)
arr = np.ones(shape, order='f')
@ps.kernel
def update_rule(s):
f = ps.fields("f(3) : [2D]", f=arr)
s.tmp0 @= f(0)
s.tmp1 @= f(1)
s.tmp2 @= f(2)
f0, f1, f2 = f(0), f(1), f(2)
f0 @= 2 * s.tmp0
f1 @= 2 * s.tmp0
f2 @= 2 * s.tmp0
config = pystencils.config.CreateKernelConfig(cpu_vectorize_info={'instruction_set': instruction_set})
ast = ps.create_kernel(update_rule, config=config)
kernel = ast.compile()
kernel(f=arr)
np.testing.assert_equal(arr, 2)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
def test_vectorized_abs_field(instruction_set, dtype):
"""Some instructions sets have abs, some don't.
Furthermore, the special treatment of unary minus makes this data type-sensitive too.
"""
arr = np.ones((2 ** 2 + 2, 2 ** 3 + 2), dtype=dtype)
arr[-3:, :] = -1
f, g = ps.fields(f=arr, g=arr)
update_rule = [ps.Assignment(g.center(), sp.Abs(f.center()))]
config = pystencils.config.CreateKernelConfig(cpu_vectorize_info={'instruction_set': instruction_set})
ast = ps.create_kernel(update_rule, config=config)
func = ast.compile()
dst = np.zeros_like(arr)
func(g=dst, f=arr)
np.testing.assert_equal(np.sum(dst[1:-1, 1:-1]), 2 ** 2 * 2 ** 3)
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
def test_vectorized_abs_scalar(instruction_set):
"""Some instructions sets have abs, some don't.
Furthermore, the special treatment of unary minus makes this data type-sensitive too.
"""
arr = np.zeros((2 ** 2 + 2, 2 ** 3 + 2), dtype="float64")
f = ps.fields(f=arr)
update_rule = [ps.Assignment(f.center(), sp.Abs(sp.Symbol("a")))]
config = pystencils.config.CreateKernelConfig(cpu_vectorize_info={'instruction_set': instruction_set})
ast = ps.create_kernel(update_rule, config=config)
func = ast.compile()
func(f=arr, a=-1)
np.testing.assert_equal(np.sum(arr[1:-1, 1:-1]), 2 ** 2 * 2 ** 3)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('nontemporal', [False, True])
def test_strided(instruction_set, dtype, nontemporal):
f, g = ps.fields(f"f, g : {dtype}[2D]")
update_rule = [ps.Assignment(g[0, 0], f[0, 0] + f[-1, 0] + f[1, 0] + f[0, 1] + f[0, -1] + 42.0)]
config = pystencils.config.CreateKernelConfig(cpu_vectorize_info={'instruction_set': instruction_set,
'nontemporal': nontemporal},
default_number_float=dtype)
if 'storeS' not in get_vector_instruction_set(dtype, instruction_set) \
and instruction_set not in ['avx512', 'avx512vl', 'rvv'] and not instruction_set.startswith('sve'):
with pytest.warns(UserWarning) as warn:
ast = ps.create_kernel(update_rule, config=config)
assert 'Could not vectorize loop' in warn[0].message.args[0]
else:
with pytest.warns(None) as warn:
ast = ps.create_kernel(update_rule, config=config)
assert len(warn) == 0
instruction = 'streamS' if nontemporal and 'streamS' in ast.instruction_set else 'storeS'
assert ast.instruction_set[instruction].split('{')[0] in ps.get_code_str(ast)
instruction = 'cachelineZero'
if instruction in ast.instruction_set:
assert ast.instruction_set[instruction] not in ps.get_code_str(ast)
# ps.show_code(ast)
func = ast.compile()
ref_config = pystencils.config.CreateKernelConfig(default_number_float=dtype)
ref_func = ps.create_kernel(update_rule, config=ref_config).compile()
# For some reason other array creations fail on the emulated ppc pipeline
size = (25, 19)
arr = np.zeros(size).astype(dtype)
for i in range(size[0]):
for j in range(size[1]):
arr[i, j] = i * j
dst = np.zeros_like(arr, dtype=dtype)
ref = np.zeros_like(arr, dtype=dtype)
func(g=dst, f=arr)
ref_func(g=ref, f=arr)
# print("dst: ", dst)
# print("np array: ", arr)
np.testing.assert_almost_equal(dst[1:-1, 1:-1], ref[1:-1, 1:-1], 13 if dtype == 'float64' else 5)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('gl_field, gl_kernel', [(1, 0), (0, 1), (1, 1)])
def test_alignment_and_correct_ghost_layers(gl_field, gl_kernel, instruction_set, dtype):
domain_size = (128, 128)
dh = ps.create_data_handling(domain_size, periodicity=(True, True), default_target=Target.CPU)
src = dh.add_array("src", values_per_cell=1, dtype=dtype, ghost_layers=gl_field, alignment=True)
dh.fill(src.name, 1.0, ghost_layers=True)
dst = dh.add_array("dst", values_per_cell=1, dtype=dtype, ghost_layers=gl_field, alignment=True)
dh.fill(dst.name, 1.0, ghost_layers=True)
update_rule = ps.Assignment(dst[0, 0], src[0, 0])
opt = {'instruction_set': instruction_set, 'assume_aligned': True,
'nontemporal': True, 'assume_inner_stride_one': True}
config = pystencils.config.CreateKernelConfig(target=dh.default_target,
cpu_vectorize_info=opt, ghost_layers=gl_kernel)
ast = ps.create_kernel(update_rule, config=config)
kernel = ast.compile()
if ('loadA' in ast.instruction_set or 'storeA' in ast.instruction_set) and gl_kernel != gl_field:
with pytest.raises(ValueError):
dh.run_kernel(kernel)
else:
dh.run_kernel(kernel)
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
def test_cacheline_size(instruction_set):
cacheline_size = get_cacheline_size(instruction_set)
if cacheline_size is None and instruction_set in ['sse', 'avx', 'avx512', 'avx512vl', 'rvv']:
pytest.skip()
instruction_set = get_vector_instruction_set('double', instruction_set)
vector_size = instruction_set['bytes']
assert 8 < cacheline_size < 0x100000, "Cache line size is implausible"
if type(vector_size) is int:
assert cacheline_size % vector_size == 0, "Cache line size should be multiple of vector size"
assert cacheline_size & (cacheline_size - 1) == 0, "Cache line size is not a power of 2"
# TODO move to vectorise
@pytest.mark.parametrize('instruction_set',
sorted(set(supported_instruction_sets) - {test_vectorization.instruction_set}))
@pytest.mark.parametrize('function',
[f for f in test_vectorization.__dict__ if f.startswith('test_') and f not in ['test_hardware_query', 'test_aligned_and_nt_stores']])
def test_vectorization_other(instruction_set, function):
test_vectorization.__dict__[function](instruction_set)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('field_layout', ('fzyx', 'zyxf'))
def test_square_root(dtype, instruction_set, field_layout):
config = pystencils.config.CreateKernelConfig(data_type=dtype,
default_number_float=dtype,
cpu_vectorize_info={'instruction_set': instruction_set,
'assume_inner_stride_one': True,
'assume_aligned': False,
'nontemporal': False})
src_field = ps.Field.create_generic('pdfs', 2, dtype, index_dimensions=1, layout=field_layout, index_shape=(9,))
eq = [ps.Assignment(sp.Symbol("xi"), sum(src_field.center_vector)),
ps.Assignment(sp.Symbol("xi_2"), sp.Symbol("xi") * sp.sqrt(src_field.center))]
ast = ps.create_kernel(eq, config=config)
ast.compile()
code = ps.get_code_str(ast)
print(code)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('padding', (True, False))
def test_square_root_2(dtype, instruction_set, padding):
x, y = sp.symbols("x y")
src = ps.fields(f"src: {dtype}[2D]", layout='fzyx')
up = ps.Assignment(src[0, 0], 1 / x + sp.sqrt(y * 0.52 + x ** 2))
cpu_vec = {'instruction_set': instruction_set, 'assume_inner_stride_one': True,
'assume_sufficient_line_padding': padding,
'assume_aligned': True}
config = ps.CreateKernelConfig(data_type=dtype, default_number_float=dtype, cpu_vectorize_info=cpu_vec)
ast = ps.create_kernel(up, config=config)
ast.compile()
code = ps.get_code_str(ast)
print(code)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('padding', (True, False))
def test_pow(dtype, instruction_set, padding):
config = pystencils.config.CreateKernelConfig(data_type=dtype,
default_number_float=dtype,
cpu_vectorize_info={'instruction_set': instruction_set,
'assume_inner_stride_one': True,
'assume_sufficient_line_padding': padding,
'assume_aligned': False, 'nontemporal': False})
src_field = ps.Field.create_generic('pdfs', 2, dtype, index_dimensions=1, layout='fzyx', index_shape=(9,))
eq = [ps.Assignment(sp.Symbol("xi"), sum(src_field.center_vector)),
ps.Assignment(sp.Symbol("xi_2"), sp.Symbol("xi") * sp.Pow(src_field.center, 0.5))]
ast = ps.create_kernel(eq, config=config)
ast.compile()
code = ps.get_code_str(ast)
print(code)
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
@pytest.mark.parametrize('padding', (True, False))
def test_issue62(dtype, instruction_set, padding):
opt = {'instruction_set': instruction_set, 'assume_aligned': True,
'assume_inner_stride_one': True,
'assume_sufficient_line_padding': padding}
dx = sp.Symbol("dx")
dy = sp.Symbol("dy")
src, dst, rhs = ps.fields(f"src, src_tmp, rhs: {dtype}[2D]", layout='fzyx')
up = ps.Assignment(dst[0, 0], ((dy ** 2 * (src[1, 0] + src[-1, 0]))
+ (dx ** 2 * (src[0, 1] + src[0, -1]))
- (rhs[0, 0] * dx ** 2 * dy ** 2)) / (2 * (dx ** 2 + dy ** 2)))
config = ps.CreateKernelConfig(data_type=dtype,
default_number_float=dtype,
cpu_vectorize_info=opt)
ast = ps.create_kernel(up, config=config)
ast.compile()
code = ps.get_code_str(ast)
print(code)
assert 'pow' not in code
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', supported_instruction_sets)
def test_div_and_unevaluated_expr(dtype, instruction_set):
opt = {'instruction_set': instruction_set, 'assume_aligned': True, 'assume_inner_stride_one': True,
'assume_sufficient_line_padding': False}
x, y, z = sp.symbols("x y z")
rhs = (-4 * x ** 4 * y ** 2 * z ** 2 + (x ** 4 * y ** 2 / 3) + (x ** 4 * z ** 2 / 3)) / x ** 3
src = ps.fields(f"src: {dtype}[2D]", layout='fzyx')
up = ps.Assignment(src[0, 0], rhs)
config = ps.CreateKernelConfig(data_type=dtype,
default_number_float=dtype,
cpu_vectorize_info=opt)
ast = ps.create_kernel(up, config=config)
code = ps.get_code_str(ast)
# print(code)
ast.compile()
assert 'pow' not in code
@pytest.mark.parametrize('dtype', ('float32', 'float64'))
@pytest.mark.parametrize('instruction_set', ('sve', 'sve2', 'sme', 'rvv'))
def test_check_ast_parameters_sizeless(dtype, instruction_set):
f, g = ps.fields(f"f, g: {dtype}[3D]", layout='fzyx')
update_rule = [ps.Assignment(g.center(), 2 * f.center())]
config = pystencils.config.CreateKernelConfig(data_type=dtype,
cpu_vectorize_info={'instruction_set': instruction_set})
ast = ps.create_kernel(update_rule, config=config)
ast_symbols = [p.symbol for p in ast.get_parameters()]
assert ast.instruction_set['width'] not in ast_symbols
assert ast.instruction_set['intwidth'] not in ast_symbols
# TODO this test case needs a complete rework of the vectoriser. The reason is that the vectoriser does not
# TODO vectorise symbols at the moment because they could be strides or field sizes, thus involved in pointer arithmetic
# TODO This means that the vectoriser only works if fields are involved on the rhs.
# @pytest.mark.parametrize('dtype', ('float32', 'float64'))
# @pytest.mark.parametrize('instruction_set', supported_instruction_sets)
# def test_vectorised_symbols(dtype, instruction_set):
# opt = {'instruction_set': instruction_set, 'assume_aligned': True, 'assume_inner_stride_one': True,
# 'assume_sufficient_line_padding': False}
#
# x, y, z = sp.symbols("x y z")
# rhs = 1 / x ** 2 * (x + y)
#
# src = ps.fields(f"src: {dtype}[2D]", layout='fzyx')
#
# up = ps.Assignment(src[0, 0], rhs)
#
# config = ps.CreateKernelConfig(data_type=dtype,
# default_number_float=dtype,
# cpu_vectorize_info=opt)
#
# ast = ps.create_kernel(up, config=config)
# code = ps.get_code_str(ast)
# print(code)
#
# ast.compile()
#
# assert 'pow' not in code
import pystencils as ps
def test_version_string():
version = ps.__version__
print(version)
numeric_version = [int(x, 10) for x in version.split('.')[0:1]]
test_version = sum(x * (100 ** i) for i, x in enumerate(numeric_version))
assert test_version >= 1