diff --git a/pystencils/fd/finitevolumes.py b/pystencils/fd/finitevolumes.py index 4119ac845a35e5e93c747799aa569dadde9455a3..238bb422db37f9e271f87060b0db8c39c09f68c4 100644 --- a/pystencils/fd/finitevolumes.py +++ b/pystencils/fd/finitevolumes.py @@ -213,7 +213,7 @@ def VOF(j: ps.field.Field, v: ps.field.Field, Ï: ps.field.Field): v1 = v.neighbor_vector(c) # going out - cond = sp.And(*[sp.Or(c[i] * v1[i] > 0, c[i] == 0) for i in range(len(v0))]) + cond = sp.And(*[sp.Or(c[i] * v0[i] > 0, c[i] == 0) for i in range(len(v0))]) overlap1 = [1 - sp.Abs(v0[i]) for i in range(len(v0))] overlap2 = [c[i] * v0[i] for i in range(len(v0))] overlap = sp.Mul(*[(overlap1[i] if c[i] == 0 else overlap2[i]) for i in range(len(v0))]) @@ -222,13 +222,13 @@ def VOF(j: ps.field.Field, v: ps.field.Field, Ï: ps.field.Field): # coming in cond = sp.And(*[sp.Or(c[i] * v1[i] < 0, c[i] == 0) for i in range(len(v1))]) overlap1 = [1 - sp.Abs(v1[i]) for i in range(len(v1))] - overlap2 = [c[i] * v1[i] for i in range(len(v1))] + overlap2 = [v1[i] for i in range(len(v1))] overlap = sp.Mul(*[(overlap1[i] if c[i] == 0 else overlap2[i]) for i in range(len(v1))]) fluxes[d].append(Ï.neighbor_vector(c) * overlap * sp.Piecewise((1, cond), (0, True))) for i, ff in enumerate(fluxes): fluxes[i] = ff[0] - for f in ff: + for f in ff[1:]: fluxes[i] += f assignments = [] diff --git a/pystencils_tests/test_fvm.py b/pystencils_tests/test_fvm.py index 580fdffe1b03e479d9e5312faee426e3bd21e5ce..11da32f8bfe1622a45ec6ad24294cf6c0dcb8176 100644 --- a/pystencils_tests/test_fvm.py +++ b/pystencils_tests/test_fvm.py @@ -1,5 +1,92 @@ import sympy as sp import pystencils as ps +import numpy as np +import pytest +from itertools import product + + +@pytest.mark.parametrize("dim", [2, 3]) +def test_advection_diffusion(dim: int): + # parameters + if dim == 2: + domain_size = (32, 32) + flux_neighbors = 4 + elif dim == 3: + domain_size = (16, 16, 16) + flux_neighbors = 13 + + dh = ps.create_data_handling( + domain_size=domain_size, periodicity=True, default_target='cpu') + + n_field = dh.add_array('n', values_per_cell=1) + j_field = dh.add_array('j', values_per_cell=flux_neighbors, + field_type=ps.FieldType.STAGGERED_FLUX) + velocity_field = dh.add_array('v', values_per_cell=dim) + + D = 0.0666 + time = 200 + + def grad(f): + return sp.Matrix([ps.fd.diff(f, i) for i in range(dim)]) + + flux_eq = - D * grad(n_field) + fvm_eq = ps.fd.FVM1stOrder(n_field, flux=flux_eq) + + vof_adv = ps.fd.VOF(j_field, velocity_field, n_field) + + # merge calculation of advection and diffusion terms + flux = [] + for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_field)): + assert adv.lhs == div.lhs + flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs)) + + flux_kernel = ps.create_staggered_kernel(flux).compile() + + pde_kernel = ps.create_kernel( + fvm_eq.discrete_continuity(j_field)).compile() + + sync_conc = dh.synchronization_function([n_field.name]) + + # analytical density calculation + def density(pos: np.ndarray, time: int): + return (4 * np.pi * D * time)**(-1.5) * \ + np.exp(-np.sum(np.square(pos), axis=dim) / (4 * D * time)) + + pos = np.zeros((*domain_size, dim)) + xpos = np.arange(-domain_size[0] // 2, domain_size[0] // 2) + ypos = np.arange(-domain_size[1] // 2, domain_size[1] // 2) + + if dim == 2: + pos[..., 1], pos[..., 0] = np.meshgrid(xpos, ypos) + elif dim == 3: + zpos = np.arange(-domain_size[2] // 2, domain_size[2] // 2) + pos[..., 2], pos[..., 1], pos[..., 0] = np.meshgrid(xpos, ypos, zpos) + + def run(velocity: np.ndarray, time: int): + print(f"{velocity}, {time}") + dh.fill(n_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True) + dh.fill(j_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True) + + # set initial values for velocity and density + for i in range(dim): + dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True) + dh.fill(n_field.name, 0) + dh.fill(n_field.name, 1, slice_obj=ps.make_slice[[ + dom // 2 for dom in domain_size]]) + + sync_conc() + for i in range(time): + dh.run_kernel(flux_kernel) + dh.run_kernel(pde_kernel) + sync_conc() + + calc_density = density(pos - velocity * time, time) + + np.testing.assert_allclose(dh.gather_array( + n_field.name), calc_density, atol=1e-2, rtol=0) + + for vel in product([0, -0.08, 0.08], repeat=dim): + run(np.array(vel), time) def test_ek(): @@ -59,3 +146,4 @@ def test_ek(): assert a.rhs == b.rhs # TODO: test advection and source +