Skip to content
Snippets Groups Projects

Fvm testcase with fluctuations and reactions

Merged itischler requested to merge itischler/pystencils:fvm_testcase into master
Compare and
1 file
+ 292
0
Preferences
Compare changes
+ 292
0
@@ -3,6 +3,7 @@ import pystencils as ps
import numpy as np
import pytest
from itertools import product
from pystencils.rng import random_symbol
def advection_diffusion(dim: int):
@@ -125,6 +126,297 @@ def test_advection_diffusion_3d(velocity):
advection_diffusion.runners[3](velocity)
def advection_diffusion_fluctuations(dim: int):
# parameters
if dim == 2:
L = (32, 32)
stencil_factor = np.sqrt(1 / (1 + np.sqrt(2)))
elif dim == 3:
L = (16, 16, 16)
stencil_factor = np.sqrt(1 / (1 + 2 * np.sqrt(2) + 4.0 / 3.0 * np.sqrt(3)))
dh = ps.create_data_handling(domain_size=L, periodicity=True, default_target=ps.Target.CPU)
n_field = dh.add_array('n', values_per_cell=1)
j_field = dh.add_array('j', values_per_cell=3 ** dim // 2, field_type=ps.FieldType.STAGGERED_FLUX)
velocity_field = dh.add_array('v', values_per_cell=dim)
D = 0.00666
time = 10000
def grad(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dim)])
flux_eq = - D * grad(n_field)
fvm_eq = ps.fd.FVM1stOrder(n_field, flux=flux_eq)
vof_adv = ps.fd.VOF(j_field, velocity_field, n_field)
# merge calculation of advection and diffusion terms
flux = []
for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_field)):
assert adv.lhs == div.lhs
flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs))
flux = ps.AssignmentCollection(flux)
rng_symbol_gen = random_symbol(flux.subexpressions, dim=dh.dim)
for i in range(len(flux.main_assignments)):
n = j_field.staggered_stencil[i]
assert flux.main_assignments[i].lhs == j_field.staggered_access(n)
# calculate mean density
dens = (n_field.neighbor_vector(n) + n_field.center_vector)[0] / 2
# multyply by smoothed haviside function so that fluctuation will not get bigger that the density
dens *= sp.Max(0, sp.Min(1.0, n_field.neighbor_vector(n)[0]) * sp.Min(1.0, n_field.center_vector[0]))
# lenght of the vector
length = sp.sqrt(len(j_field.staggered_stencil[i]))
# amplitude of the random fluctuations
fluct = sp.sqrt(2 * dens * D) * sp.sqrt(1 / length) * stencil_factor
# add fluctuations
fluct *= 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3)
flux.main_assignments[i] = ps.Assignment(flux.main_assignments[i].lhs, flux.main_assignments[i].rhs + fluct)
# Add the folding to the flux, so that the random numbers persist through the ghostlayers.
fold = {ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i):
ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i) % L[i] for i in range(len(L))}
flux.subs(fold)
flux_kernel = ps.create_staggered_kernel(flux).compile()
pde_kernel = ps.create_kernel(fvm_eq.discrete_continuity(j_field)).compile()
sync_conc = dh.synchronization_function([n_field.name])
# analytical density distribution calculation
def P(rho, density_init):
res = []
for r in rho:
res.append(np.power(density_init, r) * np.exp(-density_init) / np.math.gamma(r + 1))
return np.array(res)
def run(density_init: float, velocity: np.ndarray, time: int):
dh.fill(n_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(j_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
# set initial values for velocity and density
for i in range(dim):
dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True)
dh.fill(n_field.name, density_init)
measurement_intervall = 10
warm_up = 1000
data = []
sync_conc()
for i in range(warm_up):
dh.run_kernel(flux_kernel, seed=42, time_step=i)
dh.run_kernel(pde_kernel)
sync_conc()
for i in range(time):
dh.run_kernel(flux_kernel, seed=42, time_step=i + warm_up)
dh.run_kernel(pde_kernel)
sync_conc()
if(i % measurement_intervall == 0):
data = np.append(data, dh.gather_array(n_field.name).ravel(), 0)
# test mass conservation
np.testing.assert_almost_equal(dh.gather_array(n_field.name).mean(), density_init)
n_bins = 50
density_value, bins = np.histogram(data, density=True, bins=n_bins)
bins_mean = bins[:-1] + (bins[1:] - bins[:-1]) / 2
analytical_value = P(bins_mean, density_init)
print(density_value - analytical_value)
np.testing.assert_allclose(density_value, analytical_value, atol=2e-3)
return lambda density_init, v: run(density_init, np.array(v), time)
advection_diffusion_fluctuations.runners = {}
@pytest.mark.parametrize("velocity", list(product([0, 0.00041], [0, -0.00031])))
@pytest.mark.parametrize("density", [27.0, 56.5])
@pytest.mark.longrun
def test_advection_diffusion_fluctuation_2d(density, velocity):
if 2 not in advection_diffusion_fluctuations.runners:
advection_diffusion_fluctuations.runners[2] = advection_diffusion_fluctuations(2)
advection_diffusion_fluctuations.runners[2](density, velocity)
@pytest.mark.parametrize("velocity", [(0.0, 0.0, 0.0), (0.00043, -0.00017, 0.00028)])
@pytest.mark.parametrize("density", [27.0, 56.5])
@pytest.mark.longrun
def test_advection_diffusion_fluctuation_3d(density, velocity):
if 3 not in advection_diffusion_fluctuations.runners:
advection_diffusion_fluctuations.runners[3] = advection_diffusion_fluctuations(3)
advection_diffusion_fluctuations.runners[3](density, velocity)
def diffusion_reaction(fluctuations: bool):
# parameters
L = (32, 32)
stencil_factor = np.sqrt(1 / (1 + np.sqrt(2)))
dh = ps.create_data_handling(domain_size=L, periodicity=True, default_target=ps.Target.CPU)
species = 2
n_fields = []
j_fields = []
r_flux_fields = []
for i in range(species):
n_fields.append(dh.add_array(f'n_{i}', values_per_cell=1))
j_fields.append(dh.add_array(f'j_{i}', values_per_cell=3 ** dh.dim // 2,
field_type=ps.FieldType.STAGGERED_FLUX))
r_flux_fields.append(dh.add_array(f'r_{i}', values_per_cell=1))
velocity_field = dh.add_array('v', values_per_cell=dh.dim)
D = 0.00666
time = 1000
r_order = [2.0, 0.0]
r_rate_const = 0.00001
r_coefs = [-2, 1]
def grad(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dh.dim)])
flux_eq = - D * grad(n_fields[0])
fvm_eq = ps.fd.FVM1stOrder(n_fields[0], flux=flux_eq)
vof_adv = ps.fd.VOF(j_fields[0], velocity_field, n_fields[0])
continuity_assignments = fvm_eq.discrete_continuity(j_fields[0])
# merge calculation of advection and diffusion terms
flux = []
for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_fields[0])):
assert adv.lhs == div.lhs
flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs))
flux = ps.AssignmentCollection(flux)
if(fluctuations):
rng_symbol_gen = random_symbol(flux.subexpressions, dim=dh.dim)
for i in range(len(flux.main_assignments)):
n = j_fields[0].staggered_stencil[i]
assert flux.main_assignments[i].lhs == j_fields[0].staggered_access(n)
# calculate mean density
dens = (n_fields[0].neighbor_vector(n) + n_fields[0].center_vector)[0] / 2
# multyply by smoothed haviside function so that fluctuation will not get bigger that the density
dens *= sp.Max(0,
sp.Min(1.0, n_fields[0].neighbor_vector(n)[0]) * sp.Min(1.0, n_fields[0].center_vector[0]))
# lenght of the vector
length = sp.sqrt(len(j_fields[0].staggered_stencil[i]))
# amplitude of the random fluctuations
fluct = sp.sqrt(2 * dens * D) * sp.sqrt(1 / length) * stencil_factor
# add fluctuations
fluct *= 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3)
flux.main_assignments[i] = ps.Assignment(flux.main_assignments[i].lhs, flux.main_assignments[i].rhs + fluct)
# Add the folding to the flux, so that the random numbers persist through the ghostlayers.
fold = {ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i):
ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i) % L[i] for i in range(len(L))}
flux.subs(fold)
r_flux = ps.AssignmentCollection([ps.Assignment(j_fields[i].center, 0) for i in range(species)])
reaction = r_rate_const
for i in range(species):
reaction *= sp.Pow(n_fields[i].center, r_order[i])
if(fluctuations):
rng_symbol_gen = random_symbol(r_flux.subexpressions, dim=dh.dim)
reaction_fluctuations = sp.sqrt(sp.Abs(reaction)) * 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3)
reaction_fluctuations *= sp.Min(1, sp.Abs(reaction**2))
else:
reaction_fluctuations = 0.0
for i in range(species):
r_flux.main_assignments[i] = ps.Assignment(
r_flux_fields[i].center, (reaction + reaction_fluctuations) * r_coefs[i])
continuity_assignments.append(ps.Assignment(n_fields[0].center, n_fields[0].center + r_flux_fields[0].center))
flux_kernel = ps.create_staggered_kernel(flux).compile()
reaction_kernel = ps.create_kernel(r_flux).compile()
pde_kernel = ps.create_kernel(continuity_assignments).compile()
sync_conc = dh.synchronization_function([n_fields[0].name, n_fields[1].name])
def f(t, r, n0, fac, fluctuations):
"""Calculates the amount of product created after a certain time of a reaction with form xA -> B
Args:
t: Time of the reation
r: Reaction rate constant
n0: Initial density of the
fac: Reaction order of A (this in most cases equals the stochometric coefficient x)
fluctuations: Boolian whether fluctuations were included during the reaction.
"""
if fluctuations:
return 1 / fac * (n0 + n0 / (n0 - (n0 + 1) * np.exp(fac * r * t)))
return 1 / fac * (n0 - (1 / (fac * r * t + (1 / n0))))
def run(density_init: float, velocity: np.ndarray, time: int):
for i in range(species):
dh.fill(n_fields[i].name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(j_fields[i].name, 0.0, ghost_layers=True, inner_ghost_layers=True)
dh.fill(r_flux_fields[i].name, 0.0, ghost_layers=True, inner_ghost_layers=True)
# set initial values for velocity and density
for i in range(dh.dim):
dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True)
dh.fill(n_fields[0].name, density_init)
dh.fill(n_fields[1].name, 0.0)
measurement_intervall = 10
data = []
sync_conc()
for i in range(time):
if(i % measurement_intervall == 0):
data.append([i, dh.gather_array(n_fields[1].name).mean(), dh.gather_array(n_fields[0].name).mean()])
dh.run_kernel(reaction_kernel, seed=41, time_step=i)
for s_idx in range(species):
flux_kernel(n_0=dh.cpu_arrays[n_fields[s_idx].name],
j_0=dh.cpu_arrays[j_fields[s_idx].name],
v=dh.cpu_arrays[velocity_field.name], seed=42 + s_idx, time_step=i)
pde_kernel(n_0=dh.cpu_arrays[n_fields[s_idx].name],
j_0=dh.cpu_arrays[j_fields[s_idx].name],
r_0=dh.cpu_arrays[r_flux_fields[s_idx].name])
sync_conc()
data = np.array(data).transpose()
x = data[0]
analytical_value = f(x, r_rate_const, density_init, abs(r_coefs[0]), fluctuations)
# test mass conservation
np.testing.assert_almost_equal(
dh.gather_array(n_fields[0].name).mean() + 2 * dh.gather_array(n_fields[1].name).mean(), density_init)
r_tol = 2e-3
if fluctuations:
r_tol = 3e-2
np.testing.assert_allclose(data[1], analytical_value, rtol=r_tol)
return lambda density_init, v: run(density_init, np.array(v), time)
advection_diffusion_fluctuations.runners = {}
@pytest.mark.parametrize("velocity", list(product([0, 0.0041], [0, -0.0031])))
@pytest.mark.parametrize("density", [27.0, 56.5])
@pytest.mark.parametrize("fluctuations", [False, True])
@pytest.mark.longrun
def test_diffusion_reaction(density, velocity, fluctuations):
diffusion_reaction.runner = diffusion_reaction(fluctuations)
diffusion_reaction.runner(density, velocity)
def VOF2(j: ps.field.Field, v: ps.field.Field, ρ: ps.field.Field, simplify=True):
"""Volume-of-fluid discretization of advection