Skip to content
Snippets Groups Projects
test_fvm.py 5.11 KiB
import sympy as sp
import pystencils as ps
import numpy as np
import pytest
from itertools import product


@pytest.mark.parametrize("dim", [2, 3])
def test_advection_diffusion(dim: int):
    # parameters
    if dim == 2:
        domain_size = (32, 32)
        flux_neighbors = 4
    elif dim == 3:
        domain_size = (16, 16, 16)
        flux_neighbors = 13

    dh = ps.create_data_handling(
        domain_size=domain_size, periodicity=True, default_target='cpu')

    n_field = dh.add_array('n', values_per_cell=1)
    j_field = dh.add_array('j', values_per_cell=flux_neighbors,
                           field_type=ps.FieldType.STAGGERED_FLUX)
    velocity_field = dh.add_array('v', values_per_cell=dim)

    D = 0.0666
    time = 200

    def grad(f):
        return sp.Matrix([ps.fd.diff(f, i) for i in range(dim)])

    flux_eq = - D * grad(n_field)
    fvm_eq = ps.fd.FVM1stOrder(n_field, flux=flux_eq)

    vof_adv = ps.fd.VOF(j_field, velocity_field, n_field)

    # merge calculation of advection and diffusion terms
    flux = []
    for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_field)):
        assert adv.lhs == div.lhs
        flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs))

    flux_kernel = ps.create_staggered_kernel(flux).compile()

    pde_kernel = ps.create_kernel(
        fvm_eq.discrete_continuity(j_field)).compile()

    sync_conc = dh.synchronization_function([n_field.name])

    # analytical density calculation
    def density(pos: np.ndarray, time: int):
        return (4 * np.pi * D * time)**(-1.5) * \
            np.exp(-np.sum(np.square(pos), axis=dim) / (4 * D * time))

    pos = np.zeros((*domain_size, dim))
    xpos = np.arange(-domain_size[0] // 2, domain_size[0] // 2)
    ypos = np.arange(-domain_size[1] // 2, domain_size[1] // 2)

    if dim == 2:
        pos[..., 1], pos[..., 0] = np.meshgrid(xpos, ypos)
    elif dim == 3:
        zpos = np.arange(-domain_size[2] // 2, domain_size[2] // 2)
        pos[..., 2], pos[..., 1], pos[..., 0] = np.meshgrid(xpos, ypos, zpos)

    def run(velocity: np.ndarray, time: int):
        print(f"{velocity}, {time}")
        dh.fill(n_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
        dh.fill(j_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)

        # set initial values for velocity and density
        for i in range(dim):
            dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True)
        dh.fill(n_field.name, 0)
        dh.fill(n_field.name, 1, slice_obj=ps.make_slice[[
                dom // 2 for dom in domain_size]])

        sync_conc()
        for i in range(time):
            dh.run_kernel(flux_kernel)
            dh.run_kernel(pde_kernel)
            sync_conc()

        calc_density = density(pos - velocity * time, time)

        np.testing.assert_allclose(dh.gather_array(
            n_field.name), calc_density, atol=1e-2, rtol=0)

    for vel in product([0, -0.08, 0.08], repeat=dim):
        run(np.array(vel), time)


def test_ek():

    # parameters

    L = (40, 40)
    D = sp.Symbol("D")
    z = sp.Symbol("z")

    # data structures

    dh = ps.create_data_handling(L, periodicity=True, default_target='cpu')
    c = dh.add_array('c', values_per_cell=1)
    v = dh.add_array('v', values_per_cell=dh.dim)
    j = dh.add_array('j', values_per_cell=dh.dim * 2, field_type=ps.FieldType.STAGGERED_FLUX)
    Phi = dh.add_array('Φ', values_per_cell=1)

    # perform automatic discretization

    def Gradient(f):
        return sp.Matrix([ps.fd.diff(f, i) for i in range(dh.dim)])

    flux_eq = -D * Gradient(c) + D * z * c.center * Gradient(Phi)

    disc = ps.fd.FVM1stOrder(c, flux_eq)
    flux_assignments = disc.discrete_flux(j)
    advection_assignments = ps.fd.VOF(j, v, c)
    continuity_assignments = disc.discrete_continuity(j)

    # manual discretization

    x_staggered = - c[-1, 0] + c[0, 0] + z * (c[-1, 0] + c[0, 0]) / 2 * (Phi[-1, 0] - Phi[0, 0])
    y_staggered = - c[0, -1] + c[0, 0] + z * (c[0, -1] + c[0, 0]) / 2 * (Phi[0, -1] - Phi[0, 0])
    xy_staggered = - c[-1, -1] + c[0, 0] + z * (c[-1, -1] + c[0, 0]) / 2 * (Phi[-1, -1] - Phi[0, 0])
    xY_staggered = - c[-1, 1] + c[0, 0] + z * (c[-1, 1] + c[0, 0]) / 2 * (Phi[-1, 1] - Phi[0, 0])

    jj = j.staggered_access
    divergence = -1 / (1 + sp.sqrt(2) if j.index_shape[0] == 4 else 1) * \
        sum([jj(d) / sp.Matrix(ps.stencil.direction_string_to_offset(d)).norm() for d in j.staggered_stencil
            + [ps.stencil.inverse_direction_string(d) for d in j.staggered_stencil]])

    update = [ps.Assignment(c.center, c.center + divergence)]
    flux = [ps.Assignment(j.staggered_access("W"), D * x_staggered),
            ps.Assignment(j.staggered_access("S"), D * y_staggered)]
    if j.index_shape[0] == 4:
        flux += [ps.Assignment(j.staggered_access("SW"), D * xy_staggered),
                 ps.Assignment(j.staggered_access("NW"), D * xY_staggered)]

    # compare

    for a, b in zip(flux, flux_assignments):
        assert a.lhs == b.lhs
        assert sp.simplify(a.rhs - b.rhs) == 0
    for a, b in zip(update, continuity_assignments):
        assert a.lhs == b.lhs
        assert a.rhs == b.rhs

# TODO: test source