Skip to content
Snippets Groups Projects
test_datahandling.py 12 KiB
Newer Older
import os
from tempfile import TemporaryDirectory
from pathlib import Path

import numpy as np

import pystencils as ps
from pystencils import create_data_handling, create_kernel
Markus Holzer's avatar
Markus Holzer committed
from pystencils.datahandling.pycuda import PyCudaArrayHandler
Jan Hönig's avatar
Jan Hönig committed
from pystencils.enums import Target
try:
    import pytest
except ImportError:
    import unittest.mock
    pytest = unittest.mock.MagicMock()

SCRIPT_FOLDER = Path(__file__).parent.absolute()
INPUT_FOLDER = SCRIPT_FOLDER / "test_data"

def basic_iteration(dh):
    dh.add_array('basic_iter_test_gl_default')
    dh.add_array('basic_iter_test_gl_3', ghost_layers=3)

    for b in dh.iterate():
        assert b.shape == b['basic_iter_test_gl_3'].shape
        assert b.shape == b['basic_iter_test_gl_default'].shape


def access_and_gather(dh, domain_size):
Markus Holzer's avatar
Markus Holzer committed
    dh.add_array('f1', dtype=np.dtype(np.int8))
    dh.add_array_like('f2', 'f1')
    dh.add_array('v1', values_per_cell=3, dtype=np.int64, ghost_layers=2)
    dh.add_array_like('v2', 'v1')

    dh.swap('f1', 'f2')
    dh.swap('v1', 'v2')

    # Check symbolic field properties
    assert dh.fields.f1.index_dimensions == 0
    assert dh.fields.f1.spatial_dimensions == len(domain_size)
Markus Holzer's avatar
Markus Holzer committed
    assert dh.fields.f1.dtype.numpy_dtype == np.int8

    assert dh.fields.v1.index_dimensions == 1
    assert dh.fields.v1.spatial_dimensions == len(domain_size)
    assert dh.fields.v1.dtype.numpy_dtype == np.int64

    for b in dh.iterate(ghost_layers=0):
        val = sum(b.cell_index_arrays)
        np.copyto(b['f1'], val)
        for i, coord_arr in enumerate(b.cell_index_arrays):
            np.copyto(b['v1'][..., i], coord_arr)

    full_arr = dh.gather_array('v1')
    if full_arr is not None:
        expected_shape = domain_size + (3,)
        assert full_arr.shape == expected_shape
        for x in range(full_arr.shape[0]):
            for y in range(full_arr.shape[1]):
                if len(domain_size) == 3:
                    for z in range(full_arr.shape[2]):
                        assert full_arr[x, y, z, 0] == x
                        assert full_arr[x, y, z, 1] == y
                        assert full_arr[x, y, z, 2] == z
                else:
                    assert len(domain_size) == 2
                    assert full_arr[x, y, 0] == x
                    assert full_arr[x, y, 1] == y

    full_arr = dh.gather_array('f1')
    if full_arr is not None:
        expected_shape = domain_size
        assert full_arr.shape == expected_shape
        for x in range(full_arr.shape[0]):
            for y in range(full_arr.shape[1]):
                if len(domain_size) == 3:
                    for z in range(full_arr.shape[2]):
                        assert full_arr[x, y, z] == x + y + z
                else:
                    assert len(domain_size) == 2
                    assert full_arr[x, y] == x + y


def synchronization(dh, test_gpu=False):
    field_name = 'comm_field_test'
    if test_gpu:
        try:
            from pycuda import driver
            import pycuda.autoinit
        except ImportError:
            return
        field_name += 'Gpu'

Markus Holzer's avatar
Markus Holzer committed
    dh.add_array(field_name, ghost_layers=1, dtype=np.int8, cpu=True, gpu=test_gpu)

    # initialize everything with 1
    for b in dh.iterate(ghost_layers=1):
        b[field_name].fill(1)
    for b in dh.iterate(ghost_layers=0):
        b[field_name].fill(42)

    if test_gpu:
        dh.to_gpu(field_name)
Markus Holzer's avatar
Markus Holzer committed
        dh.synchronization_function_gpu(field_name)()
    else:
        dh.synchronization_function_cpu(field_name)()


    if test_gpu:
        dh.to_cpu(field_name)

    for b in dh.iterate(ghost_layers=1):
        np.testing.assert_equal(42, b[field_name])


def kernel_execution_jacobi(dh, target):
Markus Holzer's avatar
Markus Holzer committed
    test_gpu = target == Target.GPU
    dh.add_array('f', gpu=test_gpu)
    dh.add_array('tmp', gpu=test_gpu)

    if test_gpu:
        assert dh.is_on_gpu('f')
        assert dh.is_on_gpu('tmp')

    stencil_2d = [(1, 0), (-1, 0), (0, 1), (0, -1)]
    stencil_3d = [(1, 0, 0), (-1, 0, 0), (0, 1, 0), (0, -1, 0), (0, 0, 1), (0, 0, -1)]
    stencil = stencil_2d if dh.dim == 2 else stencil_3d

    @ps.kernel
    def jacobi():
        dh.fields.tmp.center @= sum(dh.fields.f.neighbors(stencil)) / len(stencil)

Jan Hönig's avatar
Jan Hönig committed
    kernel = create_kernel(jacobi, target=target).compile()
    for b in dh.iterate(ghost_layers=1):
        b['f'].fill(42)
    dh.run_kernel(kernel)
    for b in dh.iterate(ghost_layers=0):
        np.testing.assert_equal(b['f'], 42)


def vtk_output(dh):
Michael Kuron's avatar
Michael Kuron committed
    pytest.importorskip('pyevtk')
    dh.add_array('scalar_field')
    dh.add_array('vector_field', values_per_cell=dh.dim)
    dh.add_array('multiple_scalar_field', values_per_cell=9)
    dh.add_array('flag_field', dtype=np.uint16)

    fields_names = ['scalar_field', 'vector_field', 'multiple_scalar_field', 'flag_field']
    with TemporaryDirectory() as tmp_dir:
        writer1 = dh.create_vtk_writer(os.path.join(tmp_dir, "out1"), fields_names, ghost_layers=True)
        writer2 = dh.create_vtk_writer(os.path.join(tmp_dir, "out2"), fields_names, ghost_layers=False)
        masks_to_name = {1: 'flag1', 5: 'some_mask'}
        writer3 = dh.create_vtk_writer_for_flag_array(os.path.join(tmp_dir, "out3"), 'flag_field', masks_to_name)
        writer1(1)
        writer2(1)
        writer3(1)


def reduction(dh):
    float_seq = [1.0, 2.0, 3.0]
    int_seq = [1, 2, 3]
    for op in ('min', 'max', 'sum'):
        assert (dh.reduce_float_sequence(float_seq, op) == float_seq).all()
        assert (dh.reduce_int_sequence(int_seq, op) == int_seq).all()


def test_symbolic_fields():
    dh = create_data_handling(domain_size=(5, 7))
    dh.add_array('f1', values_per_cell=dh.dim)
    assert dh.fields['f1'].spatial_dimensions == dh.dim
    assert dh.fields['f1'].index_dimensions == 1

    dh.add_array_like("f_tmp", "f1", latex_name=r"f_{tmp}")
    assert dh.fields['f_tmp'].spatial_dimensions == dh.dim
    assert dh.fields['f_tmp'].index_dimensions == 1

    dh.swap('f1', 'f_tmp')


def test_access():
    for domain_shape in [(2, 3, 4), (2, 4)]:
        for f_size in (1, 4):
            dh = create_data_handling(domain_size=domain_shape)
            dh.add_array('f1', values_per_cell=f_size)
            assert dh.dim == len(domain_shape)

            for b in dh.iterate(ghost_layers=1):
                if f_size > 1:
                    assert b['f1'].shape == tuple(ds+2 for ds in domain_shape) + (f_size,)
                else:
                    assert b['f1'].shape == tuple(ds + 2 for ds in domain_shape)

            for b in dh.iterate(ghost_layers=0):
                if f_size > 1:
                    assert b['f1'].shape == domain_shape + (f_size,)
                else:
                    assert b['f1'].shape == domain_shape


def test_access_and_gather():
    for domain_shape in [(2, 2, 3), (2, 3)]:
        dh = create_data_handling(domain_size=domain_shape, periodicity=True)
        access_and_gather(dh, domain_shape)
        synchronization(dh, test_gpu=False)
        synchronization(dh, test_gpu=True)


def test_kernel():
    for domain_shape in [(4, 5), (3, 4, 5)]:
        dh = create_data_handling(domain_size=domain_shape, periodicity=True)
        assert all(dh.periodicity)
Jan Hönig's avatar
Jan Hönig committed
        kernel_execution_jacobi(dh, Target.CPU)
Martin Bauer's avatar
Martin Bauer committed
        try:
            import pycuda
            dh = create_data_handling(domain_size=domain_shape, periodicity=True)
Jan Hönig's avatar
Jan Hönig committed
            kernel_execution_jacobi(dh, Target.GPU)
Martin Bauer's avatar
Martin Bauer committed
        except ImportError:
            pass

Markus Holzer's avatar
Markus Holzer committed
@pytest.mark.parametrize('target', (Target.CPU, Target.GPU))
def test_kernel_param(target):
    for domain_shape in [(4, 5), (3, 4, 5)]:
Jan Hönig's avatar
Jan Hönig committed
        if target == Target.GPU:
            pytest.importorskip('pycuda')

        dh = create_data_handling(domain_size=domain_shape, periodicity=True, default_target=target)
        kernel_execution_jacobi(dh, target)
        reduction(dh)


def test_vtk_output():
Michael Kuron's avatar
Michael Kuron committed
    pytest.importorskip('pyevtk')
    for domain_shape in [(4, 5), (3, 4, 5)]:
        dh = create_data_handling(domain_size=domain_shape, periodicity=True)
        vtk_output(dh)


def test_add_arrays():
    domain_shape = (3, 4, 5)
    field_description = 'x, y(9)'

    dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=0, default_layout='numpy')
    x_, y_ = dh.add_arrays(field_description)

    x, y = ps.fields(field_description + ': [3,4,5]')

    assert x_ == x
    assert y_ == y
    assert x == dh.fields['x']
    assert y == dh.fields['y']


def test_get_kwarg():
    domain_shape = (10, 10)
    field_description = 'src, dst'

    dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=1)
    src, dst = dh.add_arrays(field_description)
    dh.fill("src", 1.0, ghost_layers=True)
    dh.fill("dst", 0.0, ghost_layers=True)

Markus Holzer's avatar
Markus Holzer committed
    with pytest.raises(ValueError):
        dh.add_array('src')

    ur = ps.Assignment(src.center, dst.center)
    kernel = ps.create_kernel(ur).compile()

    kw = dh.get_kernel_kwargs(kernel)
    assert np.all(kw[0]['src'] == dh.cpu_arrays['src'])
    assert np.all(kw[0]['dst'] == dh.cpu_arrays['dst'])


def test_add_custom_data():
    pytest.importorskip('pycuda')

    import pycuda.gpuarray as gpuarray
    import pycuda.autoinit  # noqa

    def cpu_data_create_func():
        return np.ones((2, 2), dtype=np.float64)

    def gpu_data_create_func():
        return gpuarray.zeros((2, 2), dtype=np.float64)

    def cpu_to_gpu_transfer_func(gpuarr, cpuarray):
        gpuarr.set(cpuarray)

    def gpu_to_cpu_transfer_func(gpuarr, cpuarray):
        gpuarr.get(cpuarray)

    dh = create_data_handling(domain_size=(10, 10))
    dh.add_custom_data('custom_data',
                       cpu_data_create_func,
                       gpu_data_create_func,
                       cpu_to_gpu_transfer_func,
                       gpu_to_cpu_transfer_func)

    assert np.all(dh.custom_data_cpu['custom_data'] == 1)
    assert np.all(dh.custom_data_gpu['custom_data'].get() == 0)

    dh.to_cpu(name='custom_data')
    dh.to_gpu(name='custom_data')

    assert 'custom_data' in dh.custom_data_names


def test_log():
    dh = create_data_handling(domain_size=(10, 10))
    dh.log_on_root()
    assert dh.is_root
    assert dh.world_rank == 0


def test_save_data():
    domain_shape = (2, 2)

    dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=1)
    dh.add_array("src", values_per_cell=9)
    dh.fill("src", 1.0, ghost_layers=True)
    dh.add_array("dst", values_per_cell=9)
    dh.fill("dst", 1.0, ghost_layers=True)

    dh.save_all(str(INPUT_FOLDER) + '/datahandling_save_test')


def test_load_data():
    domain_shape = (2, 2)

    dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=1)
    dh.add_array("src", values_per_cell=9)
    dh.fill("src", 0.0, ghost_layers=True)
    dh.add_array("dst", values_per_cell=9)
    dh.fill("dst", 0.0, ghost_layers=True)

    dh.load_all(str(INPUT_FOLDER) + '/datahandling_load_test')
    assert np.all(dh.cpu_arrays['src']) == 1
    assert np.all(dh.cpu_arrays['dst']) == 1

    domain_shape = (3, 3)

    dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=1)
    dh.add_array("src", values_per_cell=9)
    dh.fill("src", 0.0, ghost_layers=True)
    dh.add_array("dst", values_per_cell=9)
    dh.fill("dst", 0.0, ghost_layers=True)
    dh.add_array("dst2", values_per_cell=9)
    dh.fill("dst2", 0.0, ghost_layers=True)

    dh.load_all(str(INPUT_FOLDER) + '/datahandling_load_test')
    assert np.all(dh.cpu_arrays['src']) == 0
    assert np.all(dh.cpu_arrays['dst']) == 0
    assert np.all(dh.cpu_arrays['dst2']) == 0
Markus Holzer's avatar
Markus Holzer committed
def test_array_handler():
Markus Holzer's avatar
Markus Holzer committed
    size = (2, 2)
Markus Holzer's avatar
Markus Holzer committed
    pytest.importorskip('pycuda')
    array_handler = PyCudaArrayHandler()
Markus Holzer's avatar
Markus Holzer committed

    zero_array = array_handler.zeros(size)
    cpu_array = np.empty(size)
    array_handler.download(zero_array, cpu_array)
    assert np.all(cpu_array) == 0

    ones_array = array_handler.ones(size)
    cpu_array = np.empty(size)
    array_handler.download(ones_array, cpu_array)
    assert np.all(cpu_array) == 1

    empty = array_handler.empty(size)
    assert empty.strides == (16, 8)
    empty = array_handler.empty(shape=size, layout=(1, 0))
    assert empty.strides == (8, 16)

    random_array = array_handler.randn(size)