-
Michael Kuron authoredabea8714
test_fvm.py 5.11 KiB
import sympy as sp
import pystencils as ps
import numpy as np
import pytest
from itertools import product
@pytest.mark.parametrize("dim", [2, 3])
def test_advection_diffusion(dim: int):
# parameters
if dim == 2:
domain_size = (32, 32)
flux_neighbors = 4
elif dim == 3:
domain_size = (16, 16, 16)
flux_neighbors = 13
dh = ps.create_data_handling(
domain_size=domain_size, periodicity=True, default_target='cpu')
n_field = dh.add_array('n', values_per_cell=1)
j_field = dh.add_array('j', values_per_cell=flux_neighbors,
field_type=ps.FieldType.STAGGERED_FLUX)
velocity_field = dh.add_array('v', values_per_cell=dim)
D = 0.0666
time = 200
def grad(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dim)])
flux_eq = - D * grad(n_field)
fvm_eq = ps.fd.FVM1stOrder(n_field, flux=flux_eq)
vof_adv = ps.fd.VOF(j_field, velocity_field, n_field)
# merge calculation of advection and diffusion terms
flux = []
for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_field)):
assert adv.lhs == div.lhs
flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs))
flux_kernel = ps.create_staggered_kernel(flux).compile()
pde_kernel = ps.create_kernel(
fvm_eq.discrete_continuity(j_field)).compile()
sync_conc = dh.synchronization_function([n_field.name])
# analytical density calculation
def density(pos: np.ndarray, time: int):
return (4 * np.pi * D * time)**(-1.5) * \
np.exp(-np.sum(np.square(pos), axis=dim) / (4 * D * time))
pos = np.zeros((*domain_size, dim))
xpos = np.arange(-domain_size[0] // 2, domain_size[0] // 2)
ypos = np.arange(-domain_size[1] // 2, domain_size[1] // 2)
if dim == 2:
pos[..., 1], pos[..., 0] = np.meshgrid(xpos, ypos)
elif dim == 3:
zpos = np.arange(-domain_size[2] // 2, domain_size[2] // 2)
pos[..., 2], pos[..., 1], pos[..., 0] = np.meshgrid(xpos, ypos, zpos)
def run(velocity: np.ndarray, time: int):
print(f"{velocity}, {time}")
dh.fill(n_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(j_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
# set initial values for velocity and density
for i in range(dim):
dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True)
dh.fill(n_field.name, 0)
dh.fill(n_field.name, 1, slice_obj=ps.make_slice[[
dom // 2 for dom in domain_size]])
sync_conc()
for i in range(time):
dh.run_kernel(flux_kernel)
dh.run_kernel(pde_kernel)
sync_conc()
calc_density = density(pos - velocity * time, time)
np.testing.assert_allclose(dh.gather_array(
n_field.name), calc_density, atol=1e-2, rtol=0)
for vel in product([0, -0.08, 0.08], repeat=dim):
run(np.array(vel), time)
def test_ek():
# parameters
L = (40, 40)
D = sp.Symbol("D")
z = sp.Symbol("z")
# data structures
dh = ps.create_data_handling(L, periodicity=True, default_target='cpu')
c = dh.add_array('c', values_per_cell=1)
v = dh.add_array('v', values_per_cell=dh.dim)
j = dh.add_array('j', values_per_cell=dh.dim * 2, field_type=ps.FieldType.STAGGERED_FLUX)
Phi = dh.add_array('Φ', values_per_cell=1)
# perform automatic discretization
def Gradient(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dh.dim)])
flux_eq = -D * Gradient(c) + D * z * c.center * Gradient(Phi)
disc = ps.fd.FVM1stOrder(c, flux_eq)
flux_assignments = disc.discrete_flux(j)
advection_assignments = ps.fd.VOF(j, v, c)
continuity_assignments = disc.discrete_continuity(j)
# manual discretization
x_staggered = - c[-1, 0] + c[0, 0] + z * (c[-1, 0] + c[0, 0]) / 2 * (Phi[-1, 0] - Phi[0, 0])
y_staggered = - c[0, -1] + c[0, 0] + z * (c[0, -1] + c[0, 0]) / 2 * (Phi[0, -1] - Phi[0, 0])
xy_staggered = - c[-1, -1] + c[0, 0] + z * (c[-1, -1] + c[0, 0]) / 2 * (Phi[-1, -1] - Phi[0, 0])
xY_staggered = - c[-1, 1] + c[0, 0] + z * (c[-1, 1] + c[0, 0]) / 2 * (Phi[-1, 1] - Phi[0, 0])
jj = j.staggered_access
divergence = -1 / (1 + sp.sqrt(2) if j.index_shape[0] == 4 else 1) * \
sum([jj(d) / sp.Matrix(ps.stencil.direction_string_to_offset(d)).norm() for d in j.staggered_stencil
+ [ps.stencil.inverse_direction_string(d) for d in j.staggered_stencil]])
update = [ps.Assignment(c.center, c.center + divergence)]
flux = [ps.Assignment(j.staggered_access("W"), D * x_staggered),
ps.Assignment(j.staggered_access("S"), D * y_staggered)]
if j.index_shape[0] == 4:
flux += [ps.Assignment(j.staggered_access("SW"), D * xy_staggered),
ps.Assignment(j.staggered_access("NW"), D * xY_staggered)]
# compare
for a, b in zip(flux, flux_assignments):
assert a.lhs == b.lhs
assert sp.simplify(a.rhs - b.rhs) == 0
for a, b in zip(update, continuity_assignments):
assert a.lhs == b.lhs
assert a.rhs == b.rhs
# TODO: test source