### Thermalized LB: test 4 rel time model velocity and pressure

 """ This tests that for the thermalized LB (MRT with 15 equal relaxation times), the correct temperature is obtained and the velocity distribution matches the Maxwell-Boltzmann distribution """ """Tests velocity and stress fluctuations for thermalized LB""" import pystencils as ps from lbmpy.lbstep import LatticeBoltzmannStep from lbmpy.creationfunctions import create_lb_collision_rule from lbmpy.relaxationrates import relaxation_rate_from_lattice_viscosity, relaxation_rate_from_magic_number from lbmpy.creationfunctions import * from lbmpy.macroscopic_value_kernels import macroscopic_values_setter import numpy as np import pickle import gzip from time import time from lbmpy.moments import is_bulk_moment, is_shear_moment, get_order def single_component_maxwell(x1, x2, kT): def single_component_maxwell(x1, x2, kT, mass): """Integrate the probability density from x1 to x2 using the trapezoidal rule""" x = np.linspace(x1, x2, 1000) return np.trapz(np.exp(-x**2 / (2. * kT)), x) / np.sqrt(2. * np.pi * kT) def run_scenario(scenario, steps): scenario.pre_run() for t in range(scenario.time_steps_run, scenario.time_steps_run + steps): scenario.kernel_params['time_step'] = t scenario.time_step() scenario.post_run() scenario.time_steps_run += steps def create_scenario(domain_size, temperature=None, viscosity=None, seed=2, target='cpu', openmp=4, num_rel_rates=None): rr = [relaxation_rate_from_lattice_viscosity(viscosity)] rr = rr*num_rel_rates cr = create_lb_collision_rule( stencil='D3Q19', compressible=True, method='mrt', weighted=True, relaxation_rates=rr, fluctuating={'temperature': temperature, 'seed': seed}, optimization={'cse_global': True, 'split': False, 'cse_pdfs': True, 'vectorization': True} return np.trapz(np.exp(-mass * x**2 / (2. * kT)), x) / np.sqrt(2. * np.pi * kT/mass) def rr_getter(moment_group): """Mapos moments to 4 relaxation rates for shear, bulk, odd, even modes""" is_shear = [is_shear_moment(m, 3) for m in moment_group] is_bulk = [is_bulk_moment(m, 3) for m in moment_group] order = [get_order(m) for m in moment_group] assert min(order) == max(order) order = order[0] if order < 2: return 0 elif any(is_bulk): assert all(is_bulk) return sp.Symbol("omega_bulk") elif any(is_shear): assert all(is_shear) return sp.Symbol("omega_shear") elif order % 2 == 0: assert order > 2 return sp.Symbol("omega_even") else: return sp.Symbol("omega_odd") def second_order_moment_tensor_assignments(function_values, stencil, output_field): """Assignments for calculating the pressure tensor""" assert len(function_values) == len(stencil) dim = len(stencil[0]) return [ps.Assignment(output_field(i, j), sum(c[i] * c[j] * f for f, c in zip(function_values, stencil))) for i in range(dim) for j in range(dim)] def add_pressure_output_to_collision_rule(collision_rule, pressure_field): pressure_ouput = second_order_moment_tensor_assignments(collision_rule.method.pre_collision_pdf_symbols, collision_rule.method.stencil, pressure_field) collision_rule.main_assignments = collision_rule.main_assignments + pressure_ouput def test(): # Parameters stencil = get_stencil('D3Q19') L = [60]*3 kT = 4E-4 rho_0 = 0.8 omega_shear = 0.8 omega_bulk = 0.3 omega_even = 0.5 omega_odd = 0.4 target = 'cpu' # Setup data handling dh = ps.create_data_handling(L, periodicity=True, default_target=target) src = dh.add_array('src', values_per_cell=len(stencil), layout='f') dst = dh.add_array_like('dst', 'src') rho = dh.add_array('rho', layout='f', latex_name='\\rho') u = dh.add_array('u', values_per_cell=dh.dim, layout='f') pressure_field = dh.add_array('pressure', values_per_cell=( 3, 3), layout='f', gpu=target == 'gpu') force_field = dh.add_array( 'force', values_per_cell=3, layout='f', gpu=target == 'gpu') # Method setup method = create_mrt_orthogonal( stencil=get_stencil('D3Q19'), compressible=True, weighted=True, relaxation_rate_getter=rr_getter, force_model=force_model_from_string('luo', force_field.center_vector)) collision_rule = create_lb_collision_rule( method, fluctuating={ 'temperature': kT }, optimization={'cse_global': True} ) return LatticeBoltzmannStep(periodicity=(True, True, True), domain_size=domain_size, compressible=True, stencil='D3Q19', collision_rule=cr, optimization={'target': target, 'openmp': openmp}) def test_fluctuating_mrt(): # Unit conversions (MD to lattice) for parameters known to work with Espresso agrid = 1. m = 1. # mass per node tau = 0.01 # time step temperature = 4. / (m * agrid**2/tau**2) viscosity = 3. * tau / agrid**2 n = 8 sc = create_scenario((n, n, n), viscosity=viscosity, temperature=temperature, target='cpu', openmp=4, num_rel_rates=15) assert np.average(sc.velocity[:, :, :]) == 0. # Warmup run_scenario(sc, steps=500) # sampling: steps = 20000 v = np.zeros((steps, n, n, n, 3)) for i in range(steps): run_scenario(sc, steps=2) v[i, :, :, :, :] = np.copy(sc.velocity[:, :, :, :]) v = v.reshape((steps*n*n*n, 3)) np.testing.assert_allclose(np.mean(v, axis=0), [0, 0, 0], atol=6E-7) np.testing.assert_allclose( np.var(v, axis=0), [temperature, temperature, temperature], rtol=1E-2) v_hist, v_bins = np.histogram(v, bins=11, range=(-.08, .08), density=True) # Calculate expected values from single v_expected = [] for i in range(len(v_hist)): # Maxwell distribution res = np.exp(-v_bins[i]**2/(2.*temperature)) / \ np.sqrt(2*np.pi*temperature) res = 1./(v_bins[i+1]-v_bins[i]) * \ single_component_maxwell(v_bins[i], v_bins[i+1], temperature) v_expected.append(res) v_expected = np.array(v_expected) # 8% accuracy on the entire histogram np.testing.assert_allclose(v_hist, v_expected, rtol=0.08) # 0.5% accuracy on the middle part remove = 3 np.testing.assert_allclose( v_hist[remove:-remove], v_expected[remove:-remove], rtol=0.005) add_pressure_output_to_collision_rule(collision_rule, pressure_field) collision = create_lb_update_rule(collision_rule=collision_rule, stencil=stencil, method=method, compressible=True, kernel_type='collide_only', optimization={'symbolic_field': src}) stream = create_stream_pull_with_output_kernel(collision.method, src, dst, {'density': rho, 'velocity': u}) opts = {'cpu_openmp': True, 'cpu_vectorize_info': None, 'target': dh.default_target} # Compile kernels stream_kernel = ps.create_kernel(stream, **opts).compile() collision_kernel = ps.create_kernel(collision, **opts).compile() sync_pdfs = dh.synchronization_function([src.name]) # Initialization init = macroscopic_values_setter(collision.method, velocity=(0,)*dh.dim, pdfs=src.center_vector, density=rho.center) init_kernel = ps.create_kernel(init, ghost_layers=0).compile() dh.fill(rho.name, rho_0) dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True) dh.fill(u.name, 0) dh.run_kernel(init_kernel) # time loop def time_loop(start, steps): dh.all_to_gpu() for i in range(start, start+steps): dh.run_kernel(collision_kernel, omega_shear=omega_shear, omega_bulk=omega_bulk, omega_odd=omega_odd, omega_even=omega_even, seed=42, time_step=i) sync_pdfs() dh.run_kernel(stream_kernel) dh.swap(src.name, dst.name) return start+steps # Test t = 0 # warm up t = time_loop(t, 10) # Measurement for i in range(10): t = time_loop(t, 5) res_u = dh.gather_array("u").reshape((-1, 3)) res_rho = dh.gather_array("rho").reshape((-1,)) # mass conservation np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12) # momentum conservation momentum = np.dot(res_rho, res_u) np.testing.assert_allclose(momentum, [0, 0, 0], atol=1E-10) # temperature kinetic_energy = 1/2*np.dot(res_rho, res_u*res_u)/np.product(L) np.testing.assert_allclose( kinetic_energy, [kT/2]*3, atol=kT*0.01) # velocity distribution v_hist, v_bins = np.histogram( res_u, bins=11, range=(-.075, .075), density=True) # Calculate expected values from single v_expected = [] for j in range(len(v_hist)): # Maxwell distribution res = 1./(v_bins[j+1]-v_bins[j]) * \ single_component_maxwell( v_bins[j], v_bins[j+1], kT, rho_0) v_expected.append(res) v_expected = np.array(v_expected) # 10% accuracy on the entire histogram np.testing.assert_allclose(v_hist, v_expected, rtol=0.1) # 1% accuracy on the middle part remove = 3 np.testing.assert_allclose( v_hist[remove:-remove], v_expected[remove:-remove], rtol=0.01) # pressure tensor against expressions from # Duenweg, Schiller, Ladd, https://arxiv.org/abs/0707.1581 res_pressure = dh.gather_array("pressure").reshape((-1, 3, 3)) c_s = np.sqrt(1/3) # speed of sound # average of pressure tensor # Diagonal elements are rho c_s^22 +. When the fluid is # thermalized, the expectation value of = kT due to the # equi-partition theorem. p_av_expected = np.diag([rho_0*c_s**2 + kT]*3) np.testing.assert_allclose( np.mean(res_pressure, axis=0), p_av_expected, atol=c_s**2/2000)
 import numpy as np from lbmpy.scenarios import create_channel def test_fluctuating_generation_pipeline(): ch = create_channel((10, 10), stencil='D2Q9', method='mrt', weighted=True, relaxation_rates=[1.5] * 5, force=1e-5, force_model='luo', fluctuating={'temperature': 1e-9}, kernel_params={'time_step': 1, 'seed': 312}, optimization={'cse_global': True}) ch.run(10) assert np.max(ch.velocity[:, :]) < 0.1
