diff --git a/pystencils_tests/test_fvm.py b/pystencils_tests/test_fvm.py index 478a9530872513edd3249a489490e98164fb2dac..f4c0a663ea069bcbbb2d694e82c14259adc97bfb 100644 --- a/pystencils_tests/test_fvm.py +++ b/pystencils_tests/test_fvm.py @@ -3,6 +3,7 @@ import pystencils as ps import numpy as np import pytest from itertools import product +from pystencils.rng import random_symbol def advection_diffusion(dim: int): @@ -125,6 +126,297 @@ def test_advection_diffusion_3d(velocity): advection_diffusion.runners[3](velocity) +def advection_diffusion_fluctuations(dim: int): + # parameters + if dim == 2: + L = (32, 32) + stencil_factor = np.sqrt(1 / (1 + np.sqrt(2))) + elif dim == 3: + L = (16, 16, 16) + stencil_factor = np.sqrt(1 / (1 + 2 * np.sqrt(2) + 4.0 / 3.0 * np.sqrt(3))) + + dh = ps.create_data_handling(domain_size=L, periodicity=True, default_target=ps.Target.CPU) + + n_field = dh.add_array('n', values_per_cell=1) + j_field = dh.add_array('j', values_per_cell=3 ** dim // 2, field_type=ps.FieldType.STAGGERED_FLUX) + velocity_field = dh.add_array('v', values_per_cell=dim) + + D = 0.00666 + time = 10000 + + def grad(f): + return sp.Matrix([ps.fd.diff(f, i) for i in range(dim)]) + + flux_eq = - D * grad(n_field) + fvm_eq = ps.fd.FVM1stOrder(n_field, flux=flux_eq) + + vof_adv = ps.fd.VOF(j_field, velocity_field, n_field) + + # merge calculation of advection and diffusion terms + flux = [] + for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_field)): + assert adv.lhs == div.lhs + flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs)) + flux = ps.AssignmentCollection(flux) + + rng_symbol_gen = random_symbol(flux.subexpressions, dim=dh.dim) + for i in range(len(flux.main_assignments)): + n = j_field.staggered_stencil[i] + assert flux.main_assignments[i].lhs == j_field.staggered_access(n) + + # calculate mean density + dens = (n_field.neighbor_vector(n) + n_field.center_vector)[0] / 2 + # multyply by smoothed haviside function so that fluctuation will not get bigger that the density + dens *= sp.Max(0, sp.Min(1.0, n_field.neighbor_vector(n)[0]) * sp.Min(1.0, n_field.center_vector[0])) + + # lenght of the vector + length = sp.sqrt(len(j_field.staggered_stencil[i])) + + # amplitude of the random fluctuations + fluct = sp.sqrt(2 * dens * D) * sp.sqrt(1 / length) * stencil_factor + # add fluctuations + fluct *= 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3) + + flux.main_assignments[i] = ps.Assignment(flux.main_assignments[i].lhs, flux.main_assignments[i].rhs + fluct) + + # Add the folding to the flux, so that the random numbers persist through the ghostlayers. + fold = {ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i): + ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i) % L[i] for i in range(len(L))} + flux.subs(fold) + + flux_kernel = ps.create_staggered_kernel(flux).compile() + + pde_kernel = ps.create_kernel(fvm_eq.discrete_continuity(j_field)).compile() + + sync_conc = dh.synchronization_function([n_field.name]) + + # analytical density distribution calculation + def P(rho, density_init): + res = [] + for r in rho: + res.append(np.power(density_init, r) * np.exp(-density_init) / np.math.gamma(r + 1)) + return np.array(res) + + def run(density_init: float, velocity: np.ndarray, time: int): + dh.fill(n_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True) + dh.fill(j_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True) + + # set initial values for velocity and density + for i in range(dim): + dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True) + dh.fill(n_field.name, density_init) + + measurement_intervall = 10 + warm_up = 1000 + data = [] + + sync_conc() + for i in range(warm_up): + dh.run_kernel(flux_kernel, seed=42, time_step=i) + dh.run_kernel(pde_kernel) + sync_conc() + + for i in range(time): + dh.run_kernel(flux_kernel, seed=42, time_step=i + warm_up) + dh.run_kernel(pde_kernel) + sync_conc() + if(i % measurement_intervall == 0): + data = np.append(data, dh.gather_array(n_field.name).ravel(), 0) + + # test mass conservation + np.testing.assert_almost_equal(dh.gather_array(n_field.name).mean(), density_init) + + n_bins = 50 + + density_value, bins = np.histogram(data, density=True, bins=n_bins) + bins_mean = bins[:-1] + (bins[1:] - bins[:-1]) / 2 + analytical_value = P(bins_mean, density_init) + print(density_value - analytical_value) + np.testing.assert_allclose(density_value, analytical_value, atol=2e-3) + + return lambda density_init, v: run(density_init, np.array(v), time) + + +advection_diffusion_fluctuations.runners = {} + + +@pytest.mark.parametrize("velocity", list(product([0, 0.00041], [0, -0.00031]))) +@pytest.mark.parametrize("density", [27.0, 56.5]) +@pytest.mark.longrun +def test_advection_diffusion_fluctuation_2d(density, velocity): + if 2 not in advection_diffusion_fluctuations.runners: + advection_diffusion_fluctuations.runners[2] = advection_diffusion_fluctuations(2) + advection_diffusion_fluctuations.runners[2](density, velocity) + + +@pytest.mark.parametrize("velocity", [(0.0, 0.0, 0.0), (0.00043, -0.00017, 0.00028)]) +@pytest.mark.parametrize("density", [27.0, 56.5]) +@pytest.mark.longrun +def test_advection_diffusion_fluctuation_3d(density, velocity): + if 3 not in advection_diffusion_fluctuations.runners: + advection_diffusion_fluctuations.runners[3] = advection_diffusion_fluctuations(3) + advection_diffusion_fluctuations.runners[3](density, velocity) + + +def diffusion_reaction(fluctuations: bool): + # parameters + L = (32, 32) + stencil_factor = np.sqrt(1 / (1 + np.sqrt(2))) + + dh = ps.create_data_handling(domain_size=L, periodicity=True, default_target=ps.Target.CPU) + + species = 2 + n_fields = [] + j_fields = [] + r_flux_fields = [] + for i in range(species): + n_fields.append(dh.add_array(f'n_{i}', values_per_cell=1)) + j_fields.append(dh.add_array(f'j_{i}', values_per_cell=3 ** dh.dim // 2, + field_type=ps.FieldType.STAGGERED_FLUX)) + r_flux_fields.append(dh.add_array(f'r_{i}', values_per_cell=1)) + velocity_field = dh.add_array('v', values_per_cell=dh.dim) + + D = 0.00666 + time = 1000 + r_order = [2.0, 0.0] + r_rate_const = 0.00001 + r_coefs = [-2, 1] + + def grad(f): + return sp.Matrix([ps.fd.diff(f, i) for i in range(dh.dim)]) + + flux_eq = - D * grad(n_fields[0]) + fvm_eq = ps.fd.FVM1stOrder(n_fields[0], flux=flux_eq) + vof_adv = ps.fd.VOF(j_fields[0], velocity_field, n_fields[0]) + continuity_assignments = fvm_eq.discrete_continuity(j_fields[0]) + # merge calculation of advection and diffusion terms + flux = [] + for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_fields[0])): + assert adv.lhs == div.lhs + flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs)) + flux = ps.AssignmentCollection(flux) + + if(fluctuations): + rng_symbol_gen = random_symbol(flux.subexpressions, dim=dh.dim) + for i in range(len(flux.main_assignments)): + n = j_fields[0].staggered_stencil[i] + assert flux.main_assignments[i].lhs == j_fields[0].staggered_access(n) + + # calculate mean density + dens = (n_fields[0].neighbor_vector(n) + n_fields[0].center_vector)[0] / 2 + # multyply by smoothed haviside function so that fluctuation will not get bigger that the density + dens *= sp.Max(0, + sp.Min(1.0, n_fields[0].neighbor_vector(n)[0]) * sp.Min(1.0, n_fields[0].center_vector[0])) + + # lenght of the vector + length = sp.sqrt(len(j_fields[0].staggered_stencil[i])) + + # amplitude of the random fluctuations + fluct = sp.sqrt(2 * dens * D) * sp.sqrt(1 / length) * stencil_factor + # add fluctuations + fluct *= 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3) + + flux.main_assignments[i] = ps.Assignment(flux.main_assignments[i].lhs, flux.main_assignments[i].rhs + fluct) + + # Add the folding to the flux, so that the random numbers persist through the ghostlayers. + fold = {ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i): + ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i) % L[i] for i in range(len(L))} + flux.subs(fold) + + r_flux = ps.AssignmentCollection([ps.Assignment(j_fields[i].center, 0) for i in range(species)]) + reaction = r_rate_const + for i in range(species): + reaction *= sp.Pow(n_fields[i].center, r_order[i]) + if(fluctuations): + rng_symbol_gen = random_symbol(r_flux.subexpressions, dim=dh.dim) + reaction_fluctuations = sp.sqrt(sp.Abs(reaction)) * 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3) + reaction_fluctuations *= sp.Min(1, sp.Abs(reaction**2)) + else: + reaction_fluctuations = 0.0 + for i in range(species): + r_flux.main_assignments[i] = ps.Assignment( + r_flux_fields[i].center, (reaction + reaction_fluctuations) * r_coefs[i]) + + continuity_assignments.append(ps.Assignment(n_fields[0].center, n_fields[0].center + r_flux_fields[0].center)) + + flux_kernel = ps.create_staggered_kernel(flux).compile() + reaction_kernel = ps.create_kernel(r_flux).compile() + + pde_kernel = ps.create_kernel(continuity_assignments).compile() + + sync_conc = dh.synchronization_function([n_fields[0].name, n_fields[1].name]) + + def f(t, r, n0, fac, fluctuations): + """Calculates the amount of product created after a certain time of a reaction with form xA -> B + + Args: + t: Time of the reation + r: Reaction rate constant + n0: Initial density of the + fac: Reaction order of A (this in most cases equals the stochometric coefficient x) + fluctuations: Boolian whether fluctuations were included during the reaction. + """ + if fluctuations: + return 1 / fac * (n0 + n0 / (n0 - (n0 + 1) * np.exp(fac * r * t))) + return 1 / fac * (n0 - (1 / (fac * r * t + (1 / n0)))) + + def run(density_init: float, velocity: np.ndarray, time: int): + for i in range(species): + dh.fill(n_fields[i].name, np.nan, ghost_layers=True, inner_ghost_layers=True) + dh.fill(j_fields[i].name, 0.0, ghost_layers=True, inner_ghost_layers=True) + dh.fill(r_flux_fields[i].name, 0.0, ghost_layers=True, inner_ghost_layers=True) + + # set initial values for velocity and density + for i in range(dh.dim): + dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True) + dh.fill(n_fields[0].name, density_init) + dh.fill(n_fields[1].name, 0.0) + + measurement_intervall = 10 + data = [] + + sync_conc() + for i in range(time): + if(i % measurement_intervall == 0): + data.append([i, dh.gather_array(n_fields[1].name).mean(), dh.gather_array(n_fields[0].name).mean()]) + dh.run_kernel(reaction_kernel, seed=41, time_step=i) + for s_idx in range(species): + flux_kernel(n_0=dh.cpu_arrays[n_fields[s_idx].name], + j_0=dh.cpu_arrays[j_fields[s_idx].name], + v=dh.cpu_arrays[velocity_field.name], seed=42 + s_idx, time_step=i) + pde_kernel(n_0=dh.cpu_arrays[n_fields[s_idx].name], + j_0=dh.cpu_arrays[j_fields[s_idx].name], + r_0=dh.cpu_arrays[r_flux_fields[s_idx].name]) + sync_conc() + + data = np.array(data).transpose() + x = data[0] + analytical_value = f(x, r_rate_const, density_init, abs(r_coefs[0]), fluctuations) + + # test mass conservation + np.testing.assert_almost_equal( + dh.gather_array(n_fields[0].name).mean() + 2 * dh.gather_array(n_fields[1].name).mean(), density_init) + + r_tol = 2e-3 + if fluctuations: + r_tol = 3e-2 + np.testing.assert_allclose(data[1], analytical_value, rtol=r_tol) + + return lambda density_init, v: run(density_init, np.array(v), time) + + +advection_diffusion_fluctuations.runners = {} + + +@pytest.mark.parametrize("velocity", list(product([0, 0.0041], [0, -0.0031]))) +@pytest.mark.parametrize("density", [27.0, 56.5]) +@pytest.mark.parametrize("fluctuations", [False, True]) +@pytest.mark.longrun +def test_diffusion_reaction(density, velocity, fluctuations): + diffusion_reaction.runner = diffusion_reaction(fluctuations) + diffusion_reaction.runner(density, velocity) + + def VOF2(j: ps.field.Field, v: ps.field.Field, Ï: ps.field.Field, simplify=True): """Volume-of-fluid discretization of advection