Skip to content
Snippets Groups Projects
Commit 4a55e603 authored by itischler's avatar itischler Committed by Markus Holzer
Browse files

Fvm testcase with fluctuations and reactions

parent 6f87eb5d
No related merge requests found
...@@ -3,6 +3,7 @@ import pystencils as ps ...@@ -3,6 +3,7 @@ import pystencils as ps
import numpy as np import numpy as np
import pytest import pytest
from itertools import product from itertools import product
from pystencils.rng import random_symbol
def advection_diffusion(dim: int): def advection_diffusion(dim: int):
...@@ -125,6 +126,297 @@ def test_advection_diffusion_3d(velocity): ...@@ -125,6 +126,297 @@ def test_advection_diffusion_3d(velocity):
advection_diffusion.runners[3](velocity) advection_diffusion.runners[3](velocity)
def advection_diffusion_fluctuations(dim: int):
# parameters
if dim == 2:
L = (32, 32)
stencil_factor = np.sqrt(1 / (1 + np.sqrt(2)))
elif dim == 3:
L = (16, 16, 16)
stencil_factor = np.sqrt(1 / (1 + 2 * np.sqrt(2) + 4.0 / 3.0 * np.sqrt(3)))
dh = ps.create_data_handling(domain_size=L, periodicity=True, default_target=ps.Target.CPU)
n_field = dh.add_array('n', values_per_cell=1)
j_field = dh.add_array('j', values_per_cell=3 ** dim // 2, field_type=ps.FieldType.STAGGERED_FLUX)
velocity_field = dh.add_array('v', values_per_cell=dim)
D = 0.00666
time = 10000
def grad(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dim)])
flux_eq = - D * grad(n_field)
fvm_eq = ps.fd.FVM1stOrder(n_field, flux=flux_eq)
vof_adv = ps.fd.VOF(j_field, velocity_field, n_field)
# merge calculation of advection and diffusion terms
flux = []
for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_field)):
assert adv.lhs == div.lhs
flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs))
flux = ps.AssignmentCollection(flux)
rng_symbol_gen = random_symbol(flux.subexpressions, dim=dh.dim)
for i in range(len(flux.main_assignments)):
n = j_field.staggered_stencil[i]
assert flux.main_assignments[i].lhs == j_field.staggered_access(n)
# calculate mean density
dens = (n_field.neighbor_vector(n) + n_field.center_vector)[0] / 2
# multyply by smoothed haviside function so that fluctuation will not get bigger that the density
dens *= sp.Max(0, sp.Min(1.0, n_field.neighbor_vector(n)[0]) * sp.Min(1.0, n_field.center_vector[0]))
# lenght of the vector
length = sp.sqrt(len(j_field.staggered_stencil[i]))
# amplitude of the random fluctuations
fluct = sp.sqrt(2 * dens * D) * sp.sqrt(1 / length) * stencil_factor
# add fluctuations
fluct *= 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3)
flux.main_assignments[i] = ps.Assignment(flux.main_assignments[i].lhs, flux.main_assignments[i].rhs + fluct)
# Add the folding to the flux, so that the random numbers persist through the ghostlayers.
fold = {ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i):
ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i) % L[i] for i in range(len(L))}
flux.subs(fold)
flux_kernel = ps.create_staggered_kernel(flux).compile()
pde_kernel = ps.create_kernel(fvm_eq.discrete_continuity(j_field)).compile()
sync_conc = dh.synchronization_function([n_field.name])
# analytical density distribution calculation
def P(rho, density_init):
res = []
for r in rho:
res.append(np.power(density_init, r) * np.exp(-density_init) / np.math.gamma(r + 1))
return np.array(res)
def run(density_init: float, velocity: np.ndarray, time: int):
dh.fill(n_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(j_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
# set initial values for velocity and density
for i in range(dim):
dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True)
dh.fill(n_field.name, density_init)
measurement_intervall = 10
warm_up = 1000
data = []
sync_conc()
for i in range(warm_up):
dh.run_kernel(flux_kernel, seed=42, time_step=i)
dh.run_kernel(pde_kernel)
sync_conc()
for i in range(time):
dh.run_kernel(flux_kernel, seed=42, time_step=i + warm_up)
dh.run_kernel(pde_kernel)
sync_conc()
if(i % measurement_intervall == 0):
data = np.append(data, dh.gather_array(n_field.name).ravel(), 0)
# test mass conservation
np.testing.assert_almost_equal(dh.gather_array(n_field.name).mean(), density_init)
n_bins = 50
density_value, bins = np.histogram(data, density=True, bins=n_bins)
bins_mean = bins[:-1] + (bins[1:] - bins[:-1]) / 2
analytical_value = P(bins_mean, density_init)
print(density_value - analytical_value)
np.testing.assert_allclose(density_value, analytical_value, atol=2e-3)
return lambda density_init, v: run(density_init, np.array(v), time)
advection_diffusion_fluctuations.runners = {}
@pytest.mark.parametrize("velocity", list(product([0, 0.00041], [0, -0.00031])))
@pytest.mark.parametrize("density", [27.0, 56.5])
@pytest.mark.longrun
def test_advection_diffusion_fluctuation_2d(density, velocity):
if 2 not in advection_diffusion_fluctuations.runners:
advection_diffusion_fluctuations.runners[2] = advection_diffusion_fluctuations(2)
advection_diffusion_fluctuations.runners[2](density, velocity)
@pytest.mark.parametrize("velocity", [(0.0, 0.0, 0.0), (0.00043, -0.00017, 0.00028)])
@pytest.mark.parametrize("density", [27.0, 56.5])
@pytest.mark.longrun
def test_advection_diffusion_fluctuation_3d(density, velocity):
if 3 not in advection_diffusion_fluctuations.runners:
advection_diffusion_fluctuations.runners[3] = advection_diffusion_fluctuations(3)
advection_diffusion_fluctuations.runners[3](density, velocity)
def diffusion_reaction(fluctuations: bool):
# parameters
L = (32, 32)
stencil_factor = np.sqrt(1 / (1 + np.sqrt(2)))
dh = ps.create_data_handling(domain_size=L, periodicity=True, default_target=ps.Target.CPU)
species = 2
n_fields = []
j_fields = []
r_flux_fields = []
for i in range(species):
n_fields.append(dh.add_array(f'n_{i}', values_per_cell=1))
j_fields.append(dh.add_array(f'j_{i}', values_per_cell=3 ** dh.dim // 2,
field_type=ps.FieldType.STAGGERED_FLUX))
r_flux_fields.append(dh.add_array(f'r_{i}', values_per_cell=1))
velocity_field = dh.add_array('v', values_per_cell=dh.dim)
D = 0.00666
time = 1000
r_order = [2.0, 0.0]
r_rate_const = 0.00001
r_coefs = [-2, 1]
def grad(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dh.dim)])
flux_eq = - D * grad(n_fields[0])
fvm_eq = ps.fd.FVM1stOrder(n_fields[0], flux=flux_eq)
vof_adv = ps.fd.VOF(j_fields[0], velocity_field, n_fields[0])
continuity_assignments = fvm_eq.discrete_continuity(j_fields[0])
# merge calculation of advection and diffusion terms
flux = []
for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_fields[0])):
assert adv.lhs == div.lhs
flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs))
flux = ps.AssignmentCollection(flux)
if(fluctuations):
rng_symbol_gen = random_symbol(flux.subexpressions, dim=dh.dim)
for i in range(len(flux.main_assignments)):
n = j_fields[0].staggered_stencil[i]
assert flux.main_assignments[i].lhs == j_fields[0].staggered_access(n)
# calculate mean density
dens = (n_fields[0].neighbor_vector(n) + n_fields[0].center_vector)[0] / 2
# multyply by smoothed haviside function so that fluctuation will not get bigger that the density
dens *= sp.Max(0,
sp.Min(1.0, n_fields[0].neighbor_vector(n)[0]) * sp.Min(1.0, n_fields[0].center_vector[0]))
# lenght of the vector
length = sp.sqrt(len(j_fields[0].staggered_stencil[i]))
# amplitude of the random fluctuations
fluct = sp.sqrt(2 * dens * D) * sp.sqrt(1 / length) * stencil_factor
# add fluctuations
fluct *= 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3)
flux.main_assignments[i] = ps.Assignment(flux.main_assignments[i].lhs, flux.main_assignments[i].rhs + fluct)
# Add the folding to the flux, so that the random numbers persist through the ghostlayers.
fold = {ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i):
ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i) % L[i] for i in range(len(L))}
flux.subs(fold)
r_flux = ps.AssignmentCollection([ps.Assignment(j_fields[i].center, 0) for i in range(species)])
reaction = r_rate_const
for i in range(species):
reaction *= sp.Pow(n_fields[i].center, r_order[i])
if(fluctuations):
rng_symbol_gen = random_symbol(r_flux.subexpressions, dim=dh.dim)
reaction_fluctuations = sp.sqrt(sp.Abs(reaction)) * 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3)
reaction_fluctuations *= sp.Min(1, sp.Abs(reaction**2))
else:
reaction_fluctuations = 0.0
for i in range(species):
r_flux.main_assignments[i] = ps.Assignment(
r_flux_fields[i].center, (reaction + reaction_fluctuations) * r_coefs[i])
continuity_assignments.append(ps.Assignment(n_fields[0].center, n_fields[0].center + r_flux_fields[0].center))
flux_kernel = ps.create_staggered_kernel(flux).compile()
reaction_kernel = ps.create_kernel(r_flux).compile()
pde_kernel = ps.create_kernel(continuity_assignments).compile()
sync_conc = dh.synchronization_function([n_fields[0].name, n_fields[1].name])
def f(t, r, n0, fac, fluctuations):
"""Calculates the amount of product created after a certain time of a reaction with form xA -> B
Args:
t: Time of the reation
r: Reaction rate constant
n0: Initial density of the
fac: Reaction order of A (this in most cases equals the stochometric coefficient x)
fluctuations: Boolian whether fluctuations were included during the reaction.
"""
if fluctuations:
return 1 / fac * (n0 + n0 / (n0 - (n0 + 1) * np.exp(fac * r * t)))
return 1 / fac * (n0 - (1 / (fac * r * t + (1 / n0))))
def run(density_init: float, velocity: np.ndarray, time: int):
for i in range(species):
dh.fill(n_fields[i].name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(j_fields[i].name, 0.0, ghost_layers=True, inner_ghost_layers=True)
dh.fill(r_flux_fields[i].name, 0.0, ghost_layers=True, inner_ghost_layers=True)
# set initial values for velocity and density
for i in range(dh.dim):
dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True)
dh.fill(n_fields[0].name, density_init)
dh.fill(n_fields[1].name, 0.0)
measurement_intervall = 10
data = []
sync_conc()
for i in range(time):
if(i % measurement_intervall == 0):
data.append([i, dh.gather_array(n_fields[1].name).mean(), dh.gather_array(n_fields[0].name).mean()])
dh.run_kernel(reaction_kernel, seed=41, time_step=i)
for s_idx in range(species):
flux_kernel(n_0=dh.cpu_arrays[n_fields[s_idx].name],
j_0=dh.cpu_arrays[j_fields[s_idx].name],
v=dh.cpu_arrays[velocity_field.name], seed=42 + s_idx, time_step=i)
pde_kernel(n_0=dh.cpu_arrays[n_fields[s_idx].name],
j_0=dh.cpu_arrays[j_fields[s_idx].name],
r_0=dh.cpu_arrays[r_flux_fields[s_idx].name])
sync_conc()
data = np.array(data).transpose()
x = data[0]
analytical_value = f(x, r_rate_const, density_init, abs(r_coefs[0]), fluctuations)
# test mass conservation
np.testing.assert_almost_equal(
dh.gather_array(n_fields[0].name).mean() + 2 * dh.gather_array(n_fields[1].name).mean(), density_init)
r_tol = 2e-3
if fluctuations:
r_tol = 3e-2
np.testing.assert_allclose(data[1], analytical_value, rtol=r_tol)
return lambda density_init, v: run(density_init, np.array(v), time)
advection_diffusion_fluctuations.runners = {}
@pytest.mark.parametrize("velocity", list(product([0, 0.0041], [0, -0.0031])))
@pytest.mark.parametrize("density", [27.0, 56.5])
@pytest.mark.parametrize("fluctuations", [False, True])
@pytest.mark.longrun
def test_diffusion_reaction(density, velocity, fluctuations):
diffusion_reaction.runner = diffusion_reaction(fluctuations)
diffusion_reaction.runner(density, velocity)
def VOF2(j: ps.field.Field, v: ps.field.Field, ρ: ps.field.Field, simplify=True): def VOF2(j: ps.field.Field, v: ps.field.Field, ρ: ps.field.Field, simplify=True):
"""Volume-of-fluid discretization of advection """Volume-of-fluid discretization of advection
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment