Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 0 additions and 3192 deletions
import subprocess
def version_number_from_git(tag_prefix='release/', sha_length=10, version_format="{version}.dev{commits}+{sha}"):
def get_released_versions():
tags = sorted(subprocess.getoutput('git tag').split('\n'))
versions = [t[len(tag_prefix):] for t in tags if t.startswith(tag_prefix)]
return versions
def tag_from_version(v):
return tag_prefix + v
def increment_version(v):
parsed_version = [int(i) for i in v.split('.')]
parsed_version[-1] += 1
return '.'.join(str(i) for i in parsed_version)
latest_release = get_released_versions()[-1]
commits_since_tag = subprocess.getoutput('git rev-list {}..HEAD --count'.format(tag_from_version(latest_release)))
sha = subprocess.getoutput('git rev-parse HEAD')[:sha_length]
is_dirty = len(subprocess.getoutput("git status --untracked-files=no -s")) > 0
if int(commits_since_tag) == 0:
version_string = latest_release
else:
next_version = increment_version(latest_release)
version_string = version_format.format(version=next_version, commits=commits_since_tag, sha=sha)
if is_dirty:
version_string += ".dirty"
return version_string
from lbmpy.boundaries.boundaryconditions import (
UBB, FixedDensity, NeumannByCopy, NoSlip, StreamInConstant)
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
__all__ = ['NoSlip', 'UBB', 'FixedDensity', 'NeumannByCopy', 'LatticeBoltzmannBoundaryHandling', 'StreamInConstant']
import sympy as sp
from lbmpy.boundaries.boundaryhandling import BoundaryOffsetInfo, LbmWeightInfo
from lbmpy.simplificationfactory import create_simplification_strategy
from pystencils import Assignment, Field
from pystencils.data_types import create_type
from pystencils.sympyextensions import get_symmetric_part
class Boundary:
"""Base class all boundaries should derive from"""
inner_or_boundary = True
single_link = False
def __init__(self, name=None):
self._name = name
def __call__(self, pdf_field, direction_symbol, lb_method, index_field):
"""
This function defines the boundary behavior and must therefore be implemented by all boundaries.
Here the boundary is defined as a list of sympy equations, from which a boundary kernel is generated.
Args:
pdf_field: pystencils field describing the pdf. The current cell is cell next to the boundary,
which is influenced by the boundary cell i.e. has a link from the boundary cell to
itself.
direction_symbol: a sympy symbol that can be used as index to the pdf_field. It describes
the direction pointing from the fluid to the boundary cell
lb_method: an instance of the LB method used. Use this to adapt the boundary to the method
(e.g. compressibility)
index_field: the boundary index field that can be used to retrieve and update boundary data
Returns:
:return: list of sympy equations
"""
raise NotImplementedError("Boundary class has to overwrite __call__")
@property
def additional_data(self):
"""Return a list of (name, type) tuples for additional data items required in this boundary
These data items can either be initialized in separate kernel see additional_data_kernel_init or by
Python callbacks - see additional_data_callback """
return []
@property
def additional_data_init_callback(self):
"""Return a callback function called with a boundary data setter object and returning a dict of
data-name to data for each element that should be initialized"""
return None
@property
def name(self):
if self._name:
return self._name
else:
return type(self).__name__
@name.setter
def name(self, new_value):
self._name = new_value
class NoSlip(Boundary):
def __init__(self, name=None):
"""Set an optional name here, to mark boundaries, for example for force evaluations"""
super(NoSlip, self).__init__(name)
"""No-Slip, (half-way) simple bounce back boundary condition, enforcing zero velocity at obstacle"""
def __call__(self, pdf_field, direction_symbol, lb_method, **kwargs):
neighbor = BoundaryOffsetInfo.offset_from_dir(direction_symbol, lb_method.dim)
inverse_dir = BoundaryOffsetInfo.inv_dir(direction_symbol)
return [Assignment(pdf_field[neighbor](inverse_dir), pdf_field(direction_symbol))]
def __hash__(self):
# All boundaries of these class behave equal -> should also be equal (as long as name is equal)
return hash(self.name)
def __eq__(self, other):
if not isinstance(other, NoSlip):
return False
return self.name == other.name
class UBB(Boundary):
"""Velocity bounce back boundary condition, enforcing specified velocity at obstacle"""
def __init__(self, velocity, adapt_velocity_to_force=False, dim=None, name=None):
"""
Args:
velocity: can either be a constant, an access into a field, or a callback function.
The callback functions gets a numpy record array with members, 'x','y','z', 'dir' (direction)
and 'velocity' which has to be set to the desired velocity of the corresponding link
adapt_velocity_to_force:
"""
super(UBB, self).__init__(name)
self._velocity = velocity
self._adaptVelocityToForce = adapt_velocity_to_force
if callable(self._velocity) and not dim:
raise ValueError("When using a velocity callback the dimension has to be specified with the dim parameter")
elif not callable(self._velocity):
dim = len(velocity)
self.dim = dim
@property
def additional_data(self):
if callable(self._velocity):
return [('vel_%d' % (i,), create_type("double")) for i in range(self.dim)]
else:
return []
@property
def additional_data_init_callback(self):
if callable(self._velocity):
return self._velocity
def __call__(self, pdf_field, direction_symbol, lb_method, index_field, **kwargs):
vel_from_idx_field = callable(self._velocity)
vel = [index_field('vel_%d' % (i,)) for i in range(self.dim)] if vel_from_idx_field else self._velocity
direction = direction_symbol
assert self.dim == lb_method.dim, "Dimension of UBB (%d) does not match dimension of method (%d)" \
% (self.dim, lb_method.dim)
neighbor = BoundaryOffsetInfo.offset_from_dir(direction, lb_method.dim)
inverse_dir = BoundaryOffsetInfo.inv_dir(direction)
velocity = tuple(v_i.get_shifted(*neighbor) if isinstance(v_i, Field.Access) and not vel_from_idx_field else v_i
for v_i in vel)
if self._adaptVelocityToForce:
cqc = lb_method.conserved_quantity_computation
shifted_vel_eqs = cqc.equilibrium_input_equations_from_init_values(velocity=velocity)
velocity = [eq.rhs for eq in shifted_vel_eqs.new_filtered(cqc.first_order_moment_symbols).main_assignments]
c_s_sq = sp.Rational(1, 3)
weight_of_direction = LbmWeightInfo.weight_of_direction
vel_term = 2 / c_s_sq * sum([d_i * v_i
for d_i, v_i in zip(neighbor, velocity)]) * weight_of_direction(direction)
# Better alternative: in conserved value computation
# rename what is currently called density to "virtual_density"
# provide a new quantity density, which is constant in case of incompressible models
if not lb_method.conserved_quantity_computation.zero_centered_pdfs:
cqc = lb_method.conserved_quantity_computation
density_symbol = sp.Symbol("rho")
pdf_field_accesses = [pdf_field(i) for i in range(len(lb_method.stencil))]
density_equations = cqc.output_equations_from_pdfs(pdf_field_accesses, {'density': density_symbol})
density_symbol = lb_method.conserved_quantity_computation.defined_symbols()['density']
result = density_equations.all_assignments
result += [Assignment(pdf_field[neighbor](inverse_dir),
pdf_field(direction) - vel_term * density_symbol)]
return result
else:
return [Assignment(pdf_field[neighbor](inverse_dir),
pdf_field(direction) - vel_term)]
class FixedDensity(Boundary):
def __init__(self, density, name=None):
if name is None:
name = "Fixed Density " + str(density)
super(FixedDensity, self).__init__(name)
self._density = density
def __call__(self, pdf_field, direction_symbol, lb_method, **kwargs):
"""Boundary condition that fixes the density/pressure at the obstacle"""
def remove_asymmetric_part_of_main_assignments(assignment_collection, degrees_of_freedom):
new_main_assignments = [Assignment(a.lhs, get_symmetric_part(a.rhs, degrees_of_freedom))
for a in assignment_collection.main_assignments]
return assignment_collection.copy(new_main_assignments)
neighbor = BoundaryOffsetInfo.offset_from_dir(direction_symbol, lb_method.dim)
inverse_dir = BoundaryOffsetInfo.inv_dir(direction_symbol)
cqc = lb_method.conserved_quantity_computation
velocity = cqc.defined_symbols()['velocity']
symmetric_eq = remove_asymmetric_part_of_main_assignments(lb_method.get_equilibrium(),
degrees_of_freedom=velocity)
substitutions = {sym: pdf_field(i) for i, sym in enumerate(lb_method.pre_collision_pdf_symbols)}
symmetric_eq = symmetric_eq.new_with_substitutions(substitutions)
simplification = create_simplification_strategy(lb_method)
symmetric_eq = simplification(symmetric_eq)
density_symbol = cqc.defined_symbols()['density']
density = self._density
equilibrium_input = cqc.equilibrium_input_equations_from_init_values(density=density)
equilibrium_input = equilibrium_input.new_without_subexpressions()
density_eq = equilibrium_input.main_assignments[0]
assert density_eq.lhs == density_symbol
transformed_density = density_eq.rhs
conditions = [(eq_i.rhs, sp.Equality(direction_symbol, i))
for i, eq_i in enumerate(symmetric_eq.main_assignments)] + [(0, True)]
eq_component = sp.Piecewise(*conditions)
subexpressions = [Assignment(eq.lhs, transformed_density if eq.lhs == density_symbol else eq.rhs)
for eq in symmetric_eq.subexpressions]
return subexpressions + [Assignment(pdf_field[neighbor](inverse_dir),
2 * eq_component - pdf_field(direction_symbol))]
class NeumannByCopy(Boundary):
def __call__(self, pdf_field, direction_symbol, lb_method, **kwargs):
neighbor = BoundaryOffsetInfo.offset_from_dir(direction_symbol, lb_method.dim)
inverse_dir = BoundaryOffsetInfo.inv_dir(direction_symbol)
return [Assignment(pdf_field[neighbor](inverse_dir), pdf_field(inverse_dir)),
Assignment(pdf_field[neighbor](direction_symbol), pdf_field(direction_symbol))]
def __hash__(self):
# All boundaries of these class behave equal -> should also be equal
return hash("NeumannByCopy")
def __eq__(self, other):
return type(other) == NeumannByCopy
class StreamInConstant(Boundary):
def __init__(self, constant, name=None):
super(StreamInConstant, self).__init__(name)
self._constant = constant
def __call__(self, pdf_field, direction_symbol, lb_method, **kwargs):
neighbor = BoundaryOffsetInfo.offset_from_dir(direction_symbol, lb_method.dim)
inverse_dir = BoundaryOffsetInfo.inv_dir(direction_symbol)
return [Assignment(pdf_field[neighbor](inverse_dir), self._constant),
Assignment(pdf_field[neighbor](direction_symbol), self._constant)]
def __hash__(self):
# All boundaries of these class behave equal -> should also be equal
return hash("StreamInConstant")
def __eq__(self, other):
return type(other) == StreamInConstant
import numpy as np
import sympy as sp
from pystencils import Assignment, TypedSymbol, create_indexed_kernel
from pystencils.backends.cbackend import CustomCodeNode
from pystencils.boundaries import BoundaryHandling
from pystencils.boundaries.boundaryhandling import BoundaryOffsetInfo
from pystencils.stencil import inverse_direction
class LatticeBoltzmannBoundaryHandling(BoundaryHandling):
def __init__(self, lb_method, data_handling, pdf_field_name, name="boundary_handling", flag_interface=None,
target='cpu', openmp=True):
self.lb_method = lb_method
super(LatticeBoltzmannBoundaryHandling, self).__init__(data_handling, pdf_field_name, lb_method.stencil,
name, flag_interface, target, openmp)
def force_on_boundary(self, boundary_obj):
from lbmpy.boundaries import NoSlip
if isinstance(boundary_obj, NoSlip):
return self._force_on_no_slip(boundary_obj)
else:
self.__call__()
return self._force_on_boundary(boundary_obj)
# ------------------------------ Implementation Details ------------------------------------------------------------
def _force_on_no_slip(self, boundary_obj):
dh = self._data_handling
ff_ghost_layers = dh.ghost_layers_of_field(self.flag_interface.flag_field_name)
method = self.lb_method
stencil = np.array(method.stencil)
result = np.zeros(self.dim)
for b in dh.iterate(ghost_layers=ff_ghost_layers):
obj_to_ind_list = b[self._index_array_name].boundary_object_to_index_list
pdf_array = b[self._field_name]
if boundary_obj in obj_to_ind_list:
ind_arr = obj_to_ind_list[boundary_obj]
indices = [ind_arr[name] for name in ('x', 'y', 'z')[:method.dim]]
indices.append(ind_arr['dir'])
values = 2 * pdf_array[tuple(indices)]
forces = stencil[ind_arr['dir']] * values[:, np.newaxis]
result += forces.sum(axis=0)
return dh.reduce_float_sequence(list(result), 'sum')
def _force_on_boundary(self, boundary_obj):
dh = self._data_handling
ff_ghost_layers = dh.ghost_layers_of_field(self.flag_interface.flag_field_name)
method = self.lb_method
stencil = np.array(method.stencil)
inv_direction = np.array([method.stencil.index(inverse_direction(d))
for d in method.stencil])
result = np.zeros(self.dim)
for b in dh.iterate(ghost_layers=ff_ghost_layers):
obj_to_ind_list = b[self._index_array_name].boundary_object_to_index_list
pdf_array = b[self._field_name]
if boundary_obj in obj_to_ind_list:
ind_arr = obj_to_ind_list[boundary_obj]
indices = [ind_arr[name] for name in ('x', 'y', 'z')[:method.dim]]
offsets = stencil[ind_arr['dir']]
inv_dir = inv_direction[ind_arr['dir']]
fluid_values = pdf_array[tuple(indices) + (ind_arr['dir'],)]
boundary_values = pdf_array[tuple(ind + offsets[:, i] for i, ind in enumerate(indices)) + (inv_dir,)]
values = fluid_values + boundary_values
forces = stencil[ind_arr['dir']] * values[:, np.newaxis]
result += forces.sum(axis=0)
return dh.reduce_float_sequence(list(result), 'sum')
def _create_boundary_kernel(self, symbolic_field, symbolic_index_field, boundary_obj):
return create_lattice_boltzmann_boundary_kernel(symbolic_field, symbolic_index_field, self.lb_method,
boundary_obj, target=self._target, openmp=self._openmp)
class LbmWeightInfo(CustomCodeNode):
# --------------------------- Functions to be used by boundaries --------------------------
@staticmethod
def weight_of_direction(dir_idx):
return sp.IndexedBase(LbmWeightInfo.WEIGHTS_SYMBOL, shape=(1,))[dir_idx]
# ---------------------------------- Internal ---------------------------------------------
WEIGHTS_SYMBOL = TypedSymbol("weights", "double")
def __init__(self, lb_method):
weights = [str(w.evalf()) for w in lb_method.weights]
w_sym = LbmWeightInfo.WEIGHTS_SYMBOL
code = "const double %s [] = { %s };\n" % (w_sym.name, ",".join(weights))
super(LbmWeightInfo, self).__init__(code, symbols_read=set(), symbols_defined={w_sym})
def create_lattice_boltzmann_boundary_kernel(pdf_field, index_field, lb_method, boundary_functor,
target='cpu', openmp=True):
elements = [BoundaryOffsetInfo(lb_method.stencil), LbmWeightInfo(lb_method)]
index_arr_dtype = index_field.dtype.numpy_dtype
dir_symbol = TypedSymbol("dir", index_arr_dtype.fields['dir'][0])
elements += [Assignment(dir_symbol, index_field[0]('dir'))]
elements += boundary_functor(pdf_field=pdf_field, direction_symbol=dir_symbol,
lb_method=lb_method, index_field=index_field)
return create_indexed_kernel(elements, [index_field], target=target, cpu_openmp=openmp)
# Workaround for cython bug
# see https://stackoverflow.com/questions/8024805/cython-compiled-c-extension-importerror-dynamic-module-does-not-define-init-fu
WORKAROUND = "Something"
import cython
ctypedef fused IntegerType:
short
int
long
long long
unsigned short
unsigned int
unsigned long
@cython.boundscheck(False)
@cython.wraparound(False)
def create_boundary_index_list_2d(object[IntegerType, ndim=2] flag_field,
int nr_of_ghost_layers, IntegerType boundary_mask, IntegerType fluid_mask,
object[int, ndim=2] stencil):
cdef int xs, ys, x, y
cdef int dirIdx, num_directions, dx, dy
xs, ys = flag_field.shape
boundary_index_list = []
num_directions = stencil.shape[0]
for y in range(nr_of_ghost_layers, ys - nr_of_ghost_layers):
for x in range(nr_of_ghost_layers, xs - nr_of_ghost_layers):
if flag_field[x, y] & fluid_mask:
for dirIdx in range(1, num_directions):
dx = stencil[dirIdx,0]
dy = stencil[dirIdx,1]
if flag_field[x + dx, y + dy] & boundary_mask:
boundary_index_list.append((x,y, dirIdx))
return boundary_index_list
@cython.boundscheck(False)
@cython.wraparound(False)
def create_boundary_index_list_3d(object[IntegerType, ndim=3] flag_field,
int nr_of_ghost_layers, IntegerType boundary_mask, IntegerType fluid_mask,
object[int, ndim=2] stencil):
cdef int xs, ys, zs, x, y, z
cdef int dirIdx, num_directions, dx, dy, dz
xs, ys, zs = flag_field.shape
boundary_index_list = []
num_directions = stencil.shape[0]
for z in range(nr_of_ghost_layers, zs - nr_of_ghost_layers):
for y in range(nr_of_ghost_layers, ys - nr_of_ghost_layers):
for x in range(nr_of_ghost_layers, xs - nr_of_ghost_layers):
if flag_field[x, y, z] & fluid_mask:
for dirIdx in range(1, num_directions):
dx = stencil[dirIdx,0]
dy = stencil[dirIdx,1]
dz = stencil[dirIdx,2]
if flag_field[x + dx, y + dy, z + dz] & boundary_mask:
boundary_index_list.append((x,y,z, dirIdx))
return boundary_index_list
r"""
Creating LBM kernels
====================
The following list describes common parameters for the creation functions. They have to be passed as keyword parameters.
Method parameters
^^^^^^^^^^^^^^^^^
General:
- ``stencil='D2Q9'``: stencil name e.g. 'D2Q9', 'D3Q19'. See :func:`pystencils.stencils.get_stencil` for details
- ``method='srt'``: name of lattice Boltzmann method. This determines the selection and relaxation pattern of
moments/cumulants, i.e. which moment/cumulant basis is chosen, and which of the basis vectors are relaxed together
- ``srt``: single relaxation time (:func:`lbmpy.methods.create_srt`)
- ``trt``: two relaxation time, first relaxation rate is for even moments and determines the viscosity (as in SRT),
the second relaxation rate is used for relaxing odd moments, and controls the bulk viscosity.
(:func:`lbmpy.methods.create_trt`)
- ``mrt``: orthogonal multi relaxation time model, number of relaxation rates depends on the stencil
(:func:`lbmpy.methods.create_mrt_orthogonal`)
- ``mrt3``: three relaxation time method, where shear moments are relaxed with first relaxation rate (and therefore
determine viscosity, second rate relaxes the shear tensor trace (determines bulk viscosity) and last rate relaxes
all other, higher order moments. If two relaxation rates are chosen the same this is equivalent to a KBC type
relaxation (:func:`lbmpy.methods.create_mrt3`)
- ``mrt_raw``: non-orthogonal MRT where all relaxation rates can be specified independently i.e. there are as many
relaxation rates as stencil entries. Look at the generated method in Jupyter to see which moment<->relaxation rate
mapping (:func:`lbmpy.methods.create_mrt_raw`)
- ``trt-kbc-n<N>`` where <N> is 1,2,3 or 4. Special two-relaxation rate method. This is not the entropic method
yet, only the relaxation pattern. To get the entropic method, see parameters below!
(:func:`lbmpy.methods.create_trt_kbc`)
- ``relaxation_rates``: sequence of relaxation rates, number depends on selected method. If you specify more rates than
method needs, the additional rates are ignored. For SRT and TRT models it is possible ot define a single
``relaxation_rate`` instead of a list, the second rate for TRT is then determined via magic number.
- ``compressible=False``: affects the selection of equilibrium moments. Both options approximate the *incompressible*
Navier Stokes Equations. However when chosen as False, the approximation is better, the standard LBM derivation is
compressible.
- ``equilibrium_order=2``: order in velocity, at which the equilibrium moment/cumulant approximation is
truncated. Order 2 is sufficient to approximate Navier-Stokes
- ``force_model=None``: possible values: ``None``, ``'simple'``, ``'luo'``, ``'guo'`` ``'buick'``, or an instance of a
class implementing the same methods as the classes in :mod:`lbmpy.forcemodels`. For details, see
:mod:`lbmpy.forcemodels`
- ``force=(0,0,0)``: either constant force or a symbolic expression depending on field value
- ``maxwellian_moments=True``: way to compute equilibrium moments/cumulants, if False the standard
discretized LBM equilibrium is used, otherwise the equilibrium moments are computed from the continuous Maxwellian.
This makes only a difference if sparse stencils are used e.g. D2Q9 and D3Q27 are not affected, D319 and DQ15 are
- ``cumulant=False``: use cumulants instead of moments
- ``initial_velocity=None``: initial velocity in domain, can either be a tuple (x,y,z) velocity to set a constant
velocity everywhere, or a numpy array with the same size of the domain, with a last coordinate of shape dim to set
velocities on cell level
- ``output={}``: a dictionary mapping macroscopic quantites e.g. the strings 'density' and 'velocity' to pystencils
fields. In each timestep the corresponding quantities are written to the given fields.
- ``velocity_input``: symbolic field where the velocities are read from (for advection diffusion LBM)
- ``density_input``: symbolic field or field access where to read density from. When passing this parameter,
``velocity_input`` has to be passed as well
- ``kernel_type``: supported values: 'stream_pull_collide' (default), 'collide_only'
Entropic methods:
- ``entropic=False``: In case there are two distinct relaxation rate in a method, one of them (usually the one, not
determining the viscosity) can be automatically chosen w.r.t an entropy condition. For details see
:mod:`lbmpy.methods.entropic`
- ``entropic_newton_iterations=None``: For moment methods the entropy optimum can be calculated in closed form.
For cumulant methods this is not possible, in that case it is computed using Newton iterations. This parameter can be
used to force Newton iterations and specify how many should be done
- ``omega_output_field=None``: you can pass a pystencils Field here, where the calculated free relaxation rate of
an entropic or Smagorinsky method is written to
LES methods:
- ``smagorinsky=False``: set to Smagorinsky constant to activate turbulence model, ``omega_output_field`` can be set to
write out adapted relaxation rates
Fluctuating LB:
- ``fluctuating``: enables fluctuating lattice Boltzmann by randomizing collision process.
Pass dictionary with parameters to ``lbmpy.fluctuatinglb.add_fluctuations_to_collision_rule``
Optimization Parameters
^^^^^^^^^^^^^^^^^^^^^^^
Simplifications / Transformations:
- ``cse_pdfs=False``: run common subexpression elimination for opposing stencil directions
- ``cse_global=False``: run common subexpression elimination after all other simplifications have been executed
- ``split=False``: split innermost loop, to handle only 2 directions per loop. This reduces the number of parallel
load/store streams and thus speeds up the kernel on most architectures
- ``builtin_periodicity=(False,False,False)``: instead of handling periodicity by copying ghost layers, the periodicity
is built into the kernel. This parameters specifies if the domain is periodic in (x,y,z) direction. Even if the
periodicity is built into the kernel, the fields have one ghost layer to be consistent with other functions.
Field size information:
- ``pdf_arr=None``: pass a numpy array here to create kernels with fixed size and create the loop nest according
to layout of this array
- ``field_size=None``: create kernel for fixed field size
- ``field_layout='c'``: ``'c'`` or ``'numpy'`` for standard numpy layout, ``'reverse_numpy'`` or ``'f'`` for fortran
layout, this does not apply when pdf_arr was given, then the same layout as pdf_arr is used
- ``symbolic_field``: pystencils field for source (pdf field that is read)
- ``symbolic_temporary_field``: pystencils field for temporary (pdf field that is written in stream, or stream-collide)
CPU:
- ``openmp=True``: Can be a boolean to turn multi threading on/off, or an integer
specifying the number of threads. If True is specified OpenMP chooses the number of threads
- ``vectorization=False``: controls manual vectorization using SIMD instrinsics. If True default vectorization settings
are use. Alternatively a dictionary with parameters for vectorize function can be passed. For example
``{'instruction_set': 'avx', 'assume_aligned': True, 'nontemporal': True}``. Nontemporal stores are only used if
assume_aligned is also activated.
GPU:
- ``target='cpu'``: ``'cpu'`` or ``'gpu'``, last option requires a CUDA enabled graphics card
and installed *pycuda* package
- ``gpu_indexing='block'``: determines mapping of CUDA threads to cells. Can be either ``'block'`` or ``'line'``
- ``gpu_indexing_params='block'``: parameters passed to init function of gpu indexing.
For ``'block'`` indexing one can e.g. specify the block size ``{'block_size' : (128, 4, 1)}``
Other:
- ``openmp=True``: only applicable for cpu simulations. Can be a boolean to turn multi threading on/off, or an integer
specifying the number of threads. If True is specified OpenMP chooses the number of threads
- ``double_precision=True``: by default simulations run with double precision floating point numbers, by setting this
parameter to False, single precision is used, which is much faster, especially on GPUs
Terminology and creation pipeline
---------------------------------
Kernel functions are created in three steps:
1. *Method*:
the method defines the collision process. Currently there are two big categories:
moment and cumulant based methods. A method defines how each moment or cumulant is relaxed by
storing the equilibrium value and the relaxation rate for each moment/cumulant.
2. *Collision/Update Rule*:
Methods can generate a "collision rule" which is an equation collection that define the
post collision values as a function of the pre-collision values. On these equation collection
simplifications are applied to reduce the number of floating point operations.
At this stage an entropic optimization step can also be added to determine one relaxation rate by an
entropy condition.
Then a streaming rule is added which transforms the collision rule into an update rule.
The streaming step depends on the pdf storage (source/destination, AABB pattern, EsoTwist).
Currently only the simple source/destination pattern is supported.
3. *AST*:
The abstract syntax tree describes the structure of the kernel, including loops and conditionals.
The ast can be modified e.g. to add OpenMP pragmas, reorder loops or apply other optimizations.
4. *Function*:
This step compiles the AST into an executable function, either for CPU or GPUs. This function
behaves like a normal Python function and runs one LBM time step.
The function :func:`create_lb_function` runs the whole pipeline, the other functions in this module
execute this pipeline only up to a certain step. Each function optionally also takes the result of the previous step.
For example, to modify the AST one can run::
ast = create_lb_ast(...)
# modify ast here
func = create_lb_function(ast=ast, ...)
"""
from copy import copy
import sympy as sp
import lbmpy.forcemodels as forcemodels
from lbmpy.fieldaccess import (
AAEvenTimeStepAccessor, AAOddTimeStepAccessor, CollideOnlyInplaceAccessor,
EsoTwistEvenTimeStepAccessor, EsoTwistOddTimeStepAccessor, PdfFieldAccessor,
PeriodicTwoFieldsAccessor, StreamPullTwoFieldsAccessor, StreamPushTwoFieldsAccessor)
from lbmpy.fluctuatinglb import add_fluctuations_to_collision_rule
from lbmpy.methods import (
create_mrt3, create_mrt_orthogonal, create_mrt_raw, create_srt, create_trt, create_trt_kbc)
from lbmpy.methods.creationfunctions import create_generic_mrt
from lbmpy.methods.cumulantbased import CumulantBasedLbMethod
from lbmpy.methods.entropic import add_entropy_condition, add_iterative_entropy_condition
from lbmpy.methods.entropic_eq_srt import create_srt_entropic
from lbmpy.relaxationrates import relaxation_rate_from_magic_number
from lbmpy.simplificationfactory import create_simplification_strategy
from lbmpy.stencils import get_stencil
from lbmpy.turbulence_models import add_smagorinsky_model
from lbmpy.updatekernels import create_lbm_kernel, create_stream_pull_with_output_kernel
from pystencils import Assignment, AssignmentCollection, create_kernel
from pystencils.cache import disk_cache_no_fallback
from pystencils.data_types import collate_types
from pystencils.field import Field, get_layout_of_array
from pystencils.stencil import have_same_entries
def create_lb_function(ast=None, optimization={}, **kwargs):
"""Creates a Python function for the LB method"""
params, opt_params = update_with_default_parameters(kwargs, optimization)
if ast is None:
params['optimization'] = opt_params
ast = create_lb_ast(**params)
res = ast.compile()
res.method = ast.method
res.update_rule = ast.update_rule
res.ast = ast
return res
def create_lb_ast(update_rule=None, optimization={}, **kwargs):
"""Creates a pystencils AST for the LB method"""
params, opt_params = update_with_default_parameters(kwargs, optimization)
if update_rule is None:
params['optimization'] = optimization
update_rule = create_lb_update_rule(**params)
field_types = set(fa.field.dtype for fa in update_rule.defined_symbols if isinstance(fa, Field.Access))
res = create_kernel(update_rule, target=opt_params['target'], data_type=collate_types(field_types),
cpu_openmp=opt_params['openmp'], cpu_vectorize_info=opt_params['vectorization'],
gpu_indexing=opt_params['gpu_indexing'], gpu_indexing_params=opt_params['gpu_indexing_params'],
ghost_layers=1)
res.method = update_rule.method
res.update_rule = update_rule
return res
@disk_cache_no_fallback
def create_lb_update_rule(collision_rule=None, optimization={}, **kwargs):
"""Creates an update rule (list of Assignments) for a LB method that describe a full sweep"""
params, opt_params = update_with_default_parameters(kwargs, optimization)
if collision_rule is None:
collision_rule = create_lb_collision_rule(**params, optimization=opt_params)
lb_method = collision_rule.method
field_data_type = 'float64' if opt_params['double_precision'] else 'float32'
q = len(collision_rule.method.stencil)
if opt_params['symbolic_field'] is not None:
src_field = opt_params['symbolic_field']
elif opt_params['field_size']:
field_size = [s + 2 for s in opt_params['field_size']] + [q]
src_field = Field.create_fixed_size(params['field_name'], field_size, index_dimensions=1,
layout=opt_params['field_layout'], dtype=field_data_type)
else:
src_field = Field.create_generic(params['field_name'], spatial_dimensions=collision_rule.method.dim,
index_shape=(q,), layout=opt_params['field_layout'], dtype=field_data_type)
if opt_params['symbolic_temporary_field'] is not None:
dst_field = opt_params['symbolic_temporary_field']
else:
dst_field = src_field.new_field_with_different_name(params['temporary_field_name'])
kernel_type = params['kernel_type']
if isinstance(kernel_type, PdfFieldAccessor):
accessor = kernel_type
return create_lbm_kernel(collision_rule, src_field, dst_field, accessor)
elif params['kernel_type'] == 'stream_pull_collide':
accessor = StreamPullTwoFieldsAccessor
if any(opt_params['builtin_periodicity']):
accessor = PeriodicTwoFieldsAccessor(opt_params['builtin_periodicity'], ghost_layers=1)
return create_lbm_kernel(collision_rule, src_field, dst_field, accessor)
elif params['kernel_type'] == 'stream_pull_only':
return create_stream_pull_with_output_kernel(lb_method, src_field, dst_field, params['output'])
else:
kernel_type_to_accessor = {
'collide_only': CollideOnlyInplaceAccessor,
'collide_stream_push': StreamPushTwoFieldsAccessor,
'esotwist_even': EsoTwistEvenTimeStepAccessor,
'esotwist_odd': EsoTwistOddTimeStepAccessor,
'aa_even': AAEvenTimeStepAccessor,
'aa_odd': AAOddTimeStepAccessor,
}
try:
accessor = kernel_type_to_accessor[kernel_type]()
except KeyError:
raise ValueError("Invalid value of parameter 'kernel_type'", params['kernel_type'])
return create_lbm_kernel(collision_rule, src_field, dst_field, accessor)
@disk_cache_no_fallback
def create_lb_collision_rule(lb_method=None, optimization={}, **kwargs):
"""Creates a collision rule (list of Assignments) for a LB method describing the collision operator (no stream)"""
params, opt_params = update_with_default_parameters(kwargs, optimization)
if lb_method is None:
lb_method = create_lb_method(**params)
split_inner_loop = 'split' in opt_params and opt_params['split']
simplification = create_simplification_strategy(lb_method, cse_pdfs=False, cse_global=False,
split_inner_loop=split_inner_loop)
cqc = lb_method.conserved_quantity_computation
rho_in = params['density_input']
u_in = params['velocity_input']
if u_in is not None and isinstance(u_in, Field):
u_in = u_in.center_vector
if rho_in is not None and isinstance(rho_in, Field):
rho_in = rho_in.center
if u_in is not None:
density_rhs = sum(lb_method.pre_collision_pdf_symbols) if rho_in is None else rho_in
eqs = [Assignment(cqc.zeroth_order_moment_symbol, density_rhs)]
eqs += [Assignment(u_sym, u_in[i]) for i, u_sym in enumerate(cqc.first_order_moment_symbols)]
eqs = AssignmentCollection(eqs, [])
collision_rule = lb_method.get_collision_rule(conserved_quantity_equations=eqs)
elif u_in is None and rho_in is not None:
raise ValueError("When setting 'density_input' parameter, 'velocity_input' has to be specified as well.")
else:
collision_rule = lb_method.get_collision_rule()
collision_rule = simplification(collision_rule)
if params['fluctuating']:
add_fluctuations_to_collision_rule(collision_rule, **params['fluctuating'])
if params['entropic']:
if params['smagorinsky']:
raise ValueError("Choose either entropic or smagorinsky")
if params['entropic_newton_iterations']:
if isinstance(params['entropic_newton_iterations'], bool):
iterations = 3
else:
iterations = params['entropic_newton_iterations']
collision_rule = add_iterative_entropy_condition(collision_rule, newton_iterations=iterations,
omega_output_field=params['omega_output_field'])
else:
collision_rule = add_entropy_condition(collision_rule, omega_output_field=params['omega_output_field'])
elif params['smagorinsky']:
smagorinsky_constant = 0.12 if params['smagorinsky'] is True else params['smagorinsky']
collision_rule = add_smagorinsky_model(collision_rule, smagorinsky_constant,
omega_output_field=params['omega_output_field'])
if 'split_groups' in collision_rule.simplification_hints:
collision_rule.simplification_hints['split_groups'][0].append(sp.Symbol("smagorinsky_omega"))
cse_pdfs = False if 'cse_pdfs' not in opt_params else opt_params['cse_pdfs']
cse_global = False if 'cse_global' not in opt_params else opt_params['cse_global']
if cse_pdfs:
from lbmpy.methods.momentbasedsimplifications import cse_in_opposing_directions
collision_rule = cse_in_opposing_directions(collision_rule)
if cse_global:
from pystencils.simp import sympy_cse
collision_rule = sympy_cse(collision_rule)
if params['output'] and params['kernel_type'] == 'stream_pull_collide':
cqc = lb_method.conserved_quantity_computation
output_eqs = cqc.output_equations_from_pdfs(lb_method.pre_collision_pdf_symbols, params['output'])
collision_rule = collision_rule.new_merged(output_eqs)
return collision_rule
def create_lb_method(**params):
"""Creates a LB method, defined by moments/cumulants for collision space, equilibrium and relaxation rates."""
params, _ = update_with_default_parameters(params, {}, fail_on_unknown_parameter=False)
if isinstance(params['stencil'], tuple) or isinstance(params['stencil'], list):
stencil_entries = params['stencil']
else:
stencil_entries = get_stencil(params['stencil'])
dim = len(stencil_entries[0])
if isinstance(params['force'], Field):
params['force'] = tuple(params['force'](i) for i in range(dim))
force_is_zero = True
for f_i in params['force']:
if f_i != 0:
force_is_zero = False
no_force_model = 'force_model' not in params or params['force_model'] == 'none' or params['force_model'] is None
if not force_is_zero and no_force_model:
params['force_model'] = 'guo'
if 'force_model' in params:
force_model = force_model_from_string(params['force_model'], params['force'][:dim])
else:
force_model = None
common_params = {
'compressible': params['compressible'],
'equilibrium_order': params['equilibrium_order'],
'force_model': force_model,
'maxwellian_moments': params['maxwellian_moments'],
'cumulant': params['cumulant'],
'c_s_sq': params['c_s_sq'],
}
method_name = params['method']
relaxation_rates = params['relaxation_rates']
if method_name.lower() == 'srt':
assert len(relaxation_rates) >= 1, "Not enough relaxation rates"
method = create_srt(stencil_entries, relaxation_rates[0], **common_params)
elif method_name.lower() == 'trt':
assert len(relaxation_rates) >= 2, "Not enough relaxation rates"
method = create_trt(stencil_entries, relaxation_rates[0], relaxation_rates[1], **common_params)
elif method_name.lower() == 'mrt':
next_relaxation_rate = [0]
def relaxation_rate_getter(_):
try:
res = relaxation_rates[next_relaxation_rate[0]]
next_relaxation_rate[0] += 1
except IndexError:
raise ValueError("Too few relaxation rates specified")
return res
method = create_mrt_orthogonal(stencil_entries, relaxation_rate_getter, **common_params)
elif method_name.lower() == 'mrt_raw':
method = create_mrt_raw(stencil_entries, relaxation_rates, **common_params)
elif method_name.lower() == 'mrt3':
method = create_mrt3(stencil_entries, relaxation_rates, **common_params)
elif method_name.lower().startswith('trt-kbc-n'):
if have_same_entries(stencil_entries, get_stencil("D2Q9")):
dim = 2
elif have_same_entries(stencil_entries, get_stencil("D3Q27")):
dim = 3
else:
raise NotImplementedError("KBC type TRT methods can only be constructed for D2Q9 and D3Q27 stencils")
method_nr = method_name[-1]
method = create_trt_kbc(dim, relaxation_rates[0], relaxation_rates[1], 'KBC-N' + method_nr, **common_params)
elif method_name.lower() == 'entropic-srt':
method = create_srt_entropic(stencil_entries, relaxation_rates[0], force_model, params['compressible'])
else:
raise ValueError("Unknown method %s" % (method_name,))
return method
def create_lb_method_from_existing(method, modification_function):
"""Creates a new method based on an existing method by modifying its collision table.
Args:
method: old method
modification_function: function receiving (moment, equilibrium_value, relaxation_rate) as arguments,
i.e. one row of the relaxation table, returning a modified version
"""
relaxation_table = (modification_function(m, eq, rr)
for m, eq, rr in zip(method.moments, method.moment_equilibrium_values, method.relaxation_rates))
compressible = method.conserved_quantity_computation.compressible
cumulant = isinstance(method, CumulantBasedLbMethod)
return create_generic_mrt(method.stencil, relaxation_table, compressible, method.force_model, cumulant)
# ----------------------------------------------------------------------------------------------------------------------
def force_model_from_string(force_model_name, force_values):
if type(force_model_name) is not str:
return force_model_name
if force_model_name == 'none':
return None
force_model_dict = {
'simple': forcemodels.Simple,
'luo': forcemodels.Luo,
'guo': forcemodels.Guo,
'buick': forcemodels.Buick,
'silva': forcemodels.Buick,
'edm': forcemodels.EDM,
}
if force_model_name.lower() not in force_model_dict:
raise ValueError("Unknown force model %s" % (force_model_name,))
force_model_class = force_model_dict[force_model_name.lower()]
return force_model_class(force_values)
def switch_to_symbolic_relaxation_rates_for_omega_adapting_methods(method_parameters, kernel_params, force=False):
"""
For entropic kernels the relaxation rate has to be a variable. If a constant was passed a
new dummy variable is inserted and the value of this variable is later on passed to the kernel
"""
if method_parameters['entropic'] or method_parameters['smagorinsky'] or force:
value_to_symbol_map = {}
new_relaxation_rates = []
for rr in method_parameters['relaxation_rates']:
if not isinstance(rr, sp.Symbol):
if rr not in value_to_symbol_map:
value_to_symbol_map[rr] = sp.Dummy()
dummy_var = value_to_symbol_map[rr]
new_relaxation_rates.append(dummy_var)
kernel_params[dummy_var.name] = rr
else:
new_relaxation_rates.append(rr)
if len(new_relaxation_rates) < 2:
new_relaxation_rates.append(sp.Dummy())
method_parameters['relaxation_rates'] = new_relaxation_rates
def update_with_default_parameters(params, opt_params=None, fail_on_unknown_parameter=True):
opt_params = opt_params if opt_params is not None else {}
default_method_description = {
'stencil': 'D2Q9',
'method': 'srt', # can be srt, trt or mrt
'relaxation_rates': None,
'compressible': False,
'equilibrium_order': 2,
'c_s_sq': sp.Rational(1, 3),
'force_model': 'none',
'force': (0, 0, 0),
'maxwellian_moments': True,
'cumulant': False,
'initial_velocity': None,
'entropic': False,
'entropic_newton_iterations': None,
'omega_output_field': None,
'smagorinsky': False,
'fluctuating': False,
'temperature': None,
'output': {},
'velocity_input': None,
'density_input': None,
'kernel_type': 'stream_pull_collide',
'field_name': 'src',
'temporary_field_name': 'dst',
'lb_method': None,
'collision_rule': None,
'update_rule': None,
'ast': None,
}
default_optimization_description = {
'cse_pdfs': False,
'cse_global': False,
'split': False,
'field_size': None,
'field_layout': 'fzyx', # can be 'numpy' (='c'), 'reverse_numpy' (='f'), 'fzyx', 'zyxf'
'symbolic_field': None,
'symbolic_temporary_field': None,
'target': 'cpu',
'openmp': False,
'double_precision': True,
'gpu_indexing': 'block',
'gpu_indexing_params': {},
'vectorization': None,
'builtin_periodicity': (False, False, False),
}
if 'relaxation_rate' in params:
if 'relaxation_rates' not in params:
if 'entropic' in params and params['entropic']:
params['relaxation_rates'] = [params['relaxation_rate']]
else:
params['relaxation_rates'] = [params['relaxation_rate'],
relaxation_rate_from_magic_number(params['relaxation_rate'])]
del params['relaxation_rate']
if 'pdf_arr' in opt_params:
arr = opt_params['pdf_arr']
opt_params['field_size'] = tuple(e - 2 for e in arr.shape[:-1])
opt_params['field_layout'] = get_layout_of_array(arr)
del opt_params['pdf_arr']
if fail_on_unknown_parameter:
unknown_params = [k for k in params.keys() if k not in default_method_description]
unknown_opt_params = [k for k in opt_params.keys() if k not in default_optimization_description]
if unknown_params:
raise ValueError("Unknown parameter(s): " + ", ".join(unknown_params))
if unknown_opt_params:
raise ValueError("Unknown optimization parameter(s): " + ",".join(unknown_opt_params))
params_result = copy(default_method_description)
params_result.update(params)
opt_params_result = copy(default_optimization_description)
opt_params_result.update(opt_params)
if params_result['relaxation_rates'] is None:
stencil_param = params_result['stencil']
if isinstance(stencil_param, tuple) or isinstance(stencil_param, list):
stencil_entries = stencil_param
else:
stencil_entries = get_stencil(params_result['stencil'])
params_result['relaxation_rates'] = sp.symbols("omega_:%d" % len(stencil_entries))
return params_result, opt_params_result
r"""
.. module:: forcemodels
:synopsis: Collection of forcing terms for hydrodynamic LBM simulations
Get started:
------------
This module offers different models to introduce a body force in the lattice Boltzmann scheme.
If you don't know which model to choose, use :class:`lbmpy.forcemodels.Guo`.
For incompressible collision models the :class:`lbmpy.forcemodels.Buick` model can be better.
Detailed information:
---------------------
Force models add a term :math:`C_F` to the collision equation:
.. math ::
f(\pmb{x} + c_q \Delta t, t + \Delta t) - f(\pmb{x},t) = \Omega(f, f^{(eq)})
+ \underbrace{F_q}_{\mbox{forcing term}}
The form of this term depends on the concrete force model: the first moment of this forcing term is equal
to the acceleration :math:`\pmb{a}` for all force models.
.. math ::
\sum_q \pmb{c}_q F_q = \pmb{a}
The second order moment is different for the forcing models - if it is zero the model is suited for
incompressible flows. For weakly compressible collision operators a force model with a corrected second order moment
should be chosen.
.. math ::
\sum_q c_{qi} c_{qj} f_q = F_i u_j + F_j u_i \hspace{1cm} \mbox{for Guo, Luo models}
\sum_q c_{qi} c_{qj} f_q = 0 \hspace{1cm} \mbox{for Simple, Buick}
Models with zero second order moment have:
.. math ::
F_q = \frac{w_q}{c_s^2} c_{qi} \; a_i
Models with nonzero second moment have:
.. math ::
F_q = \frac{w_q}{c_s^2} c_{qi} \; a_i + \frac{w_q}{c_s^4} (c_{qi} c_{qj} - c_s^2 \delta_{ij} ) u_j \, a_i
For all force models the computation of the macroscopic velocity has to be adapted (shifted) by adding a term
:math:`S_{macro}` that we call "macroscopic velocity shift"
.. math ::
\pmb{u} = \sum_q \pmb{c}_q f_q + S_{macro}
S_{macro} = \frac{\Delta t}{2} \sum_q F_q
Some models also shift the velocity entering the equilibrium distribution.
Comparison
----------
Force models can be distinguished by 2 options:
**Option 1**:
:math:`C_F = 1` and equilibrium velocity is not shifted, or :math:`C_F=(1 - \frac{\omega}{2})`
and equilibrum is shifted.
**Option 2**:
second velocity moment is zero or :math:`F_i u_j + F_j u_i`
===================== ==================== =================
Option2 \\ Option1 no equilibrium shift equilibrium shift
===================== ==================== =================
second moment zero :class:`Simple` :class:`Buick`
second moment nonzero :class:`Luo` :class:`Guo`
===================== ==================== =================
"""
import sympy as sp
from lbmpy.relaxationrates import get_shear_relaxation_rate
class Simple:
r"""
A simple force model which introduces the following additional force term in the
collision process :math:`\frac{w_q}{c_s^2} c_{qi} \; a_i` (often: force = rho * acceleration)
Should only be used with constant forces!
Shifts the macroscopic velocity by F/2, but does not change the equilibrium velocity.
"""
def __init__(self, force):
self._force = force
def __call__(self, lb_method, **kwargs):
assert len(self._force) == lb_method.dim
def scalar_product(a, b):
return sum(a_i * b_i for a_i, b_i in zip(a, b))
return [3 * w_i * scalar_product(self._force, direction)
for direction, w_i in zip(lb_method.stencil, lb_method.weights)]
def macroscopic_velocity_shift(self, density):
return default_velocity_shift(density, self._force)
class Luo:
r"""Force model by Luo :cite:`luo1993lattice`.
Shifts the macroscopic velocity by F/2, but does not change the equilibrium velocity.
"""
def __init__(self, force):
self._force = force
def __call__(self, lb_method):
u = sp.Matrix(lb_method.first_order_equilibrium_moment_symbols)
force = sp.Matrix(self._force)
result = []
for direction, w_i in zip(lb_method.stencil, lb_method.weights):
direction = sp.Matrix(direction)
result.append(3 * w_i * force.dot(direction - u + 3 * direction * direction.dot(u)))
return result
def macroscopic_velocity_shift(self, density):
return default_velocity_shift(density, self._force)
class Guo:
r"""
Force model by Guo :cite:`guo2002discrete`
Adapts the calculation of the macroscopic velocity as well as the equilibrium velocity (both shifted by F/2)!
"""
def __init__(self, force):
self._force = force
def __call__(self, lb_method):
luo = Luo(self._force)
shear_relaxation_rate = get_shear_relaxation_rate(lb_method)
correction_factor = (1 - sp.Rational(1, 2) * shear_relaxation_rate)
return [correction_factor * t for t in luo(lb_method)]
def macroscopic_velocity_shift(self, density):
return default_velocity_shift(density, self._force)
def equilibrium_velocity_shift(self, density):
return default_velocity_shift(density, self._force)
class Buick:
r"""
This force model :cite:`buick2000gravity` has a force term with zero second moment.
It is suited for incompressible lattice models.
"""
def __init__(self, force):
self._force = force
def __call__(self, lb_method, **kwargs):
simple = Simple(self._force)
shear_relaxation_rate = get_shear_relaxation_rate(lb_method)
correction_factor = (1 - sp.Rational(1, 2) * shear_relaxation_rate)
return [correction_factor * t for t in simple(lb_method)]
def macroscopic_velocity_shift(self, density):
return default_velocity_shift(density, self._force)
def equilibrium_velocity_shift(self, density):
return default_velocity_shift(density, self._force)
class EDM:
r"""Exact differencing force model"""
def __init__(self, force):
self._force = force
def __call__(self, lb_method, **kwargs):
cqc = lb_method.conserved_quantity_computation
rho = cqc.zeroth_order_moment_symbol if cqc.compressible else 1
u = cqc.first_order_moment_symbols
shifted_u = (u_i + f_i / rho for u_i, f_i in zip(u, self._force))
eq_terms = lb_method.get_equilibrium_terms()
shifted_eq = eq_terms.subs({u_i: su_i for u_i, su_i in zip(u, shifted_u)})
return shifted_eq - eq_terms
def macroscopic_velocity_shift(self, density):
return default_velocity_shift(density, self._force)
# -------------------------------- Helper functions ------------------------------------------------------------------
def default_velocity_shift(density, force):
return [f_i / (2 * density) for f_i in force]
from lbmpy.methods.creationfunctions import (
create_mrt3, create_mrt_orthogonal, create_mrt_raw, create_srt, create_trt, create_trt_kbc,
create_trt_with_magic_number, create_with_continuous_maxwellian_eq_moments,
create_with_discrete_maxwellian_eq_moments)
from lbmpy.methods.momentbased import (
AbstractConservedQuantityComputation, AbstractLbMethod, MomentBasedLbMethod, RelaxationInfo)
from .conservedquantitycomputation import DensityVelocityComputation
__all__ = ['RelaxationInfo', 'AbstractLbMethod',
'AbstractConservedQuantityComputation', 'DensityVelocityComputation', 'MomentBasedLbMethod',
'create_srt', 'create_trt', 'create_trt_with_magic_number', 'create_trt_kbc',
'create_mrt_orthogonal', 'create_mrt_raw', 'create_mrt3',
'create_with_continuous_maxwellian_eq_moments', 'create_with_discrete_maxwellian_eq_moments']
import itertools
import operator
from collections import OrderedDict
from functools import reduce
from warnings import warn
import sympy as sp
from lbmpy.maxwellian_equilibrium import (
compressible_to_incompressible_moment_value, get_cumulants_of_continuous_maxwellian_equilibrium,
get_cumulants_of_discrete_maxwellian_equilibrium,
get_moments_of_continuous_maxwellian_equilibrium,
get_moments_of_discrete_maxwellian_equilibrium)
from lbmpy.methods.abstractlbmethod import RelaxationInfo
from lbmpy.methods.conservedquantitycomputation import DensityVelocityComputation
from lbmpy.methods.cumulantbased import CumulantBasedLbMethod
from lbmpy.methods.momentbased import MomentBasedLbMethod
from lbmpy.moments import (
MOMENT_SYMBOLS, discrete_moment, exponents_to_polynomial_representations,
get_default_moment_set_for_stencil, get_order, gram_schmidt, is_even, moments_of_order,
moments_up_to_component_order, sort_moments_into_groups_of_same_order)
from lbmpy.relaxationrates import default_relaxation_rate_names, relaxation_rate_from_magic_number
from lbmpy.stencils import get_stencil
from pystencils.stencil import have_same_entries
from pystencils.sympyextensions import common_denominator
def create_with_discrete_maxwellian_eq_moments(stencil, moment_to_relaxation_rate_dict, compressible=False,
force_model=None, equilibrium_order=2,
cumulant=False, c_s_sq=sp.Rational(1, 3)):
r"""Creates a moment-based LBM by taking a list of moments with corresponding relaxation rate.
These moments are relaxed against the moments of the discrete Maxwellian distribution.
Args:
stencil: nested tuple defining the discrete velocity space. See `get_stencil`
moment_to_relaxation_rate_dict: dict that has as many entries as the stencil. Each moment, which can be
represented by an exponent tuple or in polynomial form
(see `lbmpy.moments`), is mapped to a relaxation rate.
compressible: incompressible LBM methods split the density into :math:`\rho = \rho_0 + \Delta \rho`
where :math:`\rho_0` is chosen as one, and the first moment of the pdfs is :math:`\Delta \rho` .
This approximates the incompressible Navier-Stokes equations better than the standard
compressible model.
force_model: force model instance, or None if no external forces
equilibrium_order: approximation order of macroscopic velocity :math:`\mathbf{u}` in the equilibrium
cumulant: if True relax cumulants instead of moments
c_s_sq: Speed of sound squared
Returns:
`lbmpy.methods.MomentBasedLbMethod` instance
"""
if isinstance(stencil, str):
stencil = get_stencil(stencil)
mom_to_rr_dict = OrderedDict(moment_to_relaxation_rate_dict)
assert len(mom_to_rr_dict) == len(stencil), \
"The number of moments has to be the same as the number of stencil entries"
density_velocity_computation = DensityVelocityComputation(stencil, compressible, force_model)
if cumulant:
warn("Cumulant methods should be created with maxwellian_moments=True")
eq_values = get_cumulants_of_discrete_maxwellian_equilibrium(stencil, tuple(mom_to_rr_dict.keys()),
c_s_sq=c_s_sq, compressible=compressible,
order=equilibrium_order)
else:
eq_values = get_moments_of_discrete_maxwellian_equilibrium(stencil, tuple(mom_to_rr_dict.keys()),
c_s_sq=c_s_sq, compressible=compressible,
order=equilibrium_order)
rr_dict = OrderedDict([(mom, RelaxationInfo(eq_mom, rr))
for mom, rr, eq_mom in zip(mom_to_rr_dict.keys(), mom_to_rr_dict.values(), eq_values)])
if cumulant:
return CumulantBasedLbMethod(stencil, rr_dict, density_velocity_computation, force_model)
else:
return MomentBasedLbMethod(stencil, rr_dict, density_velocity_computation, force_model)
def create_with_continuous_maxwellian_eq_moments(stencil, moment_to_relaxation_rate_dict, compressible=False,
force_model=None, equilibrium_order=2,
cumulant=False, c_s_sq=sp.Rational(1, 3)):
r"""
Creates a moment-based LBM by taking a list of moments with corresponding relaxation rate. These moments are
relaxed against the moments of the continuous Maxwellian distribution.
For parameter description see :func:`lbmpy.methods.create_with_discrete_maxwellian_eq_moments`.
By using the continuous Maxwellian we automatically get a compressible model.
"""
if isinstance(stencil, str):
stencil = get_stencil(stencil)
mom_to_rr_dict = OrderedDict(moment_to_relaxation_rate_dict)
assert len(mom_to_rr_dict) == len(stencil), "The number of moments has to be equal to the number of stencil entries"
dim = len(stencil[0])
density_velocity_computation = DensityVelocityComputation(stencil, compressible, force_model)
if cumulant:
eq_values = get_cumulants_of_continuous_maxwellian_equilibrium(tuple(mom_to_rr_dict.keys()), dim, c_s_sq=c_s_sq,
order=equilibrium_order)
else:
eq_values = get_moments_of_continuous_maxwellian_equilibrium(tuple(mom_to_rr_dict.keys()), dim, c_s_sq=c_s_sq,
order=equilibrium_order)
if not compressible:
if not compressible and cumulant:
raise NotImplementedError("Incompressible cumulants not yet supported")
rho = density_velocity_computation.defined_symbols(order=0)[1]
u = density_velocity_computation.defined_symbols(order=1)[1]
eq_values = [compressible_to_incompressible_moment_value(em, rho, u) for em in eq_values]
rr_dict = OrderedDict([(mom, RelaxationInfo(eq_mom, rr))
for mom, rr, eq_mom in zip(mom_to_rr_dict.keys(), mom_to_rr_dict.values(), eq_values)])
if cumulant:
return CumulantBasedLbMethod(stencil, rr_dict, density_velocity_computation, force_model)
else:
return MomentBasedLbMethod(stencil, rr_dict, density_velocity_computation, force_model)
def create_generic_mrt(stencil, moment_eq_value_relaxation_rate_tuples, compressible=False,
force_model=None, cumulant=False):
r"""
Creates a generic moment-based LB method.
Args:
stencil: sequence of lattice velocities
moment_eq_value_relaxation_rate_tuples: sequence of tuples containing (moment, equilibrium value, relax. rate)
compressible: compressibility, determines calculation of velocity for force models
force_model: see create_with_discrete_maxwellian_eq_moments
cumulant: true for cumulant methods, False for moment-based methods
"""
density_velocity_computation = DensityVelocityComputation(stencil, compressible, force_model)
rr_dict = OrderedDict()
for moment, eq_value, rr in moment_eq_value_relaxation_rate_tuples:
moment = sp.sympify(moment)
rr_dict[moment] = RelaxationInfo(eq_value, rr)
if cumulant:
return CumulantBasedLbMethod(stencil, rr_dict, density_velocity_computation, force_model)
else:
return MomentBasedLbMethod(stencil, rr_dict, density_velocity_computation, force_model)
def create_from_equilibrium(stencil, equilibrium, moment_to_relaxation_rate_dict, compressible=False, force_model=None):
r"""
Creates a moment-based LB method using a given equilibrium distribution function
Args:
stencil: see create_with_discrete_maxwellian_eq_moments
equilibrium: list of equilibrium terms, dependent on rho and u, one for each stencil direction
moment_to_relaxation_rate_dict: relaxation rate for each moment, or a symbol/float if all should relaxed with
the same rate
compressible: see create_with_discrete_maxwellian_eq_moments
force_model: see create_with_discrete_maxwellian_eq_moments
"""
if isinstance(stencil, str):
stencil = get_stencil(stencil)
if any(isinstance(moment_to_relaxation_rate_dict, t) for t in (sp.Symbol, float, int)):
moment_to_relaxation_rate_dict = {m: moment_to_relaxation_rate_dict
for m in get_default_moment_set_for_stencil(stencil)}
mom_to_rr_dict = OrderedDict(moment_to_relaxation_rate_dict)
assert len(mom_to_rr_dict) == len(stencil), "The number of moments has to be equal to the number of stencil entries"
density_velocity_computation = DensityVelocityComputation(stencil, compressible, force_model)
rr_dict = OrderedDict([(mom, RelaxationInfo(discrete_moment(equilibrium, mom, stencil).expand(), rr))
for mom, rr in zip(mom_to_rr_dict.keys(), mom_to_rr_dict.values())])
return MomentBasedLbMethod(stencil, rr_dict, density_velocity_computation, force_model)
# ------------------------------------ SRT / TRT/ MRT Creators ---------------------------------------------------------
def create_srt(stencil, relaxation_rate, maxwellian_moments=False, **kwargs):
r"""Creates a single relaxation time (SRT) lattice Boltzmann model also known as BGK model.
Args:
stencil: nested tuple defining the discrete velocity space. See :func:`lbmpy.stencils.get_stencil`
relaxation_rate: relaxation rate (inverse of the relaxation time)
usually called :math:`\omega` in LBM literature
maxwellian_moments: determines if the discrete or continuous maxwellian equilibrium is
used to compute the equilibrium moments
Returns:
:class:`lbmpy.methods.MomentBasedLbMethod` instance
"""
if isinstance(stencil, str):
stencil = get_stencil(stencil)
moments = get_default_moment_set_for_stencil(stencil)
rr_dict = OrderedDict([(m, relaxation_rate) for m in moments])
if maxwellian_moments:
return create_with_continuous_maxwellian_eq_moments(stencil, rr_dict, **kwargs)
else:
return create_with_discrete_maxwellian_eq_moments(stencil, rr_dict, **kwargs)
def create_trt(stencil, relaxation_rate_even_moments, relaxation_rate_odd_moments,
maxwellian_moments=False, **kwargs):
"""
Creates a two relaxation time (TRT) lattice Boltzmann model, where even and odd moments are relaxed differently.
In the SRT model the exact wall position of no-slip boundaries depends on the viscosity, the TRT method does not
have this problem.
Parameters are similar to :func:`lbmpy.methods.create_srt`, but instead of one relaxation rate there are
two relaxation rates: one for even moments (determines viscosity) and one for odd moments.
If unsure how to choose the odd relaxation rate, use the function :func:`lbmpy.methods.create_trt_with_magic_number`
"""
if isinstance(stencil, str):
stencil = get_stencil(stencil)
moments = get_default_moment_set_for_stencil(stencil)
rr_dict = OrderedDict([(m, relaxation_rate_even_moments if is_even(m) else relaxation_rate_odd_moments)
for m in moments])
if maxwellian_moments:
return create_with_continuous_maxwellian_eq_moments(stencil, rr_dict, **kwargs)
else:
return create_with_discrete_maxwellian_eq_moments(stencil, rr_dict, **kwargs)
def create_trt_with_magic_number(stencil, relaxation_rate, magic_number=sp.Rational(3, 16), **kwargs):
"""
Creates a two relaxation time (TRT) lattice Boltzmann method, where the relaxation time for odd moments is
determines from the even moment relaxation time and a "magic number".
For possible parameters see :func:`lbmpy.methods.create_trt`
"""
rr_odd = relaxation_rate_from_magic_number(relaxation_rate, magic_number)
return create_trt(stencil, relaxation_rate_even_moments=relaxation_rate,
relaxation_rate_odd_moments=rr_odd, **kwargs)
def create_mrt_raw(stencil, relaxation_rates, maxwellian_moments=False, **kwargs):
"""Creates a MRT method using non-orthogonalized moments."""
if isinstance(stencil, str):
stencil = get_stencil(stencil)
moments = get_default_moment_set_for_stencil(stencil)
rr_dict = OrderedDict(zip(moments, relaxation_rates))
if maxwellian_moments:
return create_with_continuous_maxwellian_eq_moments(stencil, rr_dict, **kwargs)
else:
return create_with_discrete_maxwellian_eq_moments(stencil, rr_dict, **kwargs)
def create_mrt3(stencil, relaxation_rates, maxwellian_moments=False, **kwargs):
"""Creates a MRT with three relaxation times.
The first rate controls viscosity, the second the bulk viscosity and the last is used to relax higher order moments.
"""
def product(iterable):
return reduce(operator.mul, iterable, 1)
if isinstance(stencil, str):
stencil = get_stencil(stencil)
dim = len(stencil[0])
the_moment = MOMENT_SYMBOLS[:dim]
shear_tensor_off_diagonal = [product(t) for t in itertools.combinations(the_moment, 2)]
shear_tensor_diagonal = [m_i * m_i for m_i in the_moment]
shear_tensor_trace = sum(shear_tensor_diagonal)
shear_tensor_trace_free_diagonal = [dim * d - shear_tensor_trace for d in shear_tensor_diagonal]
rest = [default_moment for default_moment in get_default_moment_set_for_stencil(stencil)
if get_order(default_moment) != 2]
d = shear_tensor_off_diagonal + shear_tensor_trace_free_diagonal[:-1]
t = [shear_tensor_trace]
q = rest
if 'magic_number' in kwargs:
magic_number = kwargs['magic_number']
else:
magic_number = sp.Rational(3, 16)
if len(relaxation_rates) == 1:
relaxation_rates = [relaxation_rates[0],
relaxation_rate_from_magic_number(relaxation_rates[0], magic_number=magic_number),
1]
elif len(relaxation_rates) == 2:
relaxation_rates = [relaxation_rates[0],
relaxation_rate_from_magic_number(relaxation_rates[0], magic_number=magic_number),
relaxation_rates[1]]
relaxation_rates = [relaxation_rates[0]] * len(d) + \
[relaxation_rates[1]] * len(t) + \
[relaxation_rates[2]] * len(q)
all_moments = d + t + q
moment_to_rr = OrderedDict((m, rr) for m, rr in zip(all_moments, relaxation_rates))
if maxwellian_moments:
return create_with_continuous_maxwellian_eq_moments(stencil, moment_to_rr, **kwargs)
else:
return create_with_discrete_maxwellian_eq_moments(stencil, moment_to_rr, **kwargs)
def create_trt_kbc(dim, shear_relaxation_rate, higher_order_relaxation_rate, method_name='KBC-N4',
maxwellian_moments=False, **kwargs):
"""
Creates a method with two relaxation rates, one for lower order moments which determines the viscosity and
one for higher order moments. In entropic models this second relaxation rate is chosen subject to an entropy
condition. Which moments are relaxed by which rate is determined by the method_name
Args:
dim: 2 or 3, leads to stencil D2Q9 or D3Q27
shear_relaxation_rate: relaxation rate that determines viscosity
higher_order_relaxation_rate: relaxation rate for higher order moments
method_name: string 'KBC-Nx' where x can be an number from 1 to 4, for details see
"Karlin 2015: Entropic multi relaxation lattice Boltzmann models for turbulent flows"
maxwellian_moments: determines if the discrete or continuous maxwellian equilibrium is
used to compute the equilibrium moments
"""
def product(iterable):
return reduce(operator.mul, iterable, 1)
the_moment = MOMENT_SYMBOLS[:dim]
rho = [sp.Rational(1, 1)]
velocity = list(the_moment)
shear_tensor_off_diagonal = [product(t) for t in itertools.combinations(the_moment, 2)]
shear_tensor_diagonal = [m_i * m_i for m_i in the_moment]
shear_tensor_trace = sum(shear_tensor_diagonal)
shear_tensor_trace_free_diagonal = [dim * d - shear_tensor_trace for d in shear_tensor_diagonal]
poly_repr = exponents_to_polynomial_representations
energy_transport_tensor = list(poly_repr([a for a in moments_of_order(3, dim, True)
if 3 not in a]))
explicitly_defined = set(rho + velocity + shear_tensor_off_diagonal
+ shear_tensor_diagonal + energy_transport_tensor)
rest = list(set(poly_repr(moments_up_to_component_order(2, dim))) - explicitly_defined)
assert len(rest) + len(explicitly_defined) == 3**dim
# naming according to paper Karlin 2015: Entropic multi relaxation lattice Boltzmann models for turbulent flows
d = shear_tensor_off_diagonal + shear_tensor_trace_free_diagonal[:-1]
t = [shear_tensor_trace]
q = energy_transport_tensor
if method_name == 'KBC-N1':
decomposition = [d, t + q + rest]
elif method_name == 'KBC-N2':
decomposition = [d + t, q + rest]
elif method_name == 'KBC-N3':
decomposition = [d + q, t + rest]
elif method_name == 'KBC-N4':
decomposition = [d + t + q, rest]
else:
raise ValueError("Unknown model. Supported models KBC-Nx where x in (1,2,3,4)")
omega_s, omega_h = shear_relaxation_rate, higher_order_relaxation_rate
shear_part, rest_part = decomposition
relaxation_rates = [omega_s] + \
[omega_s] * len(velocity) + \
[omega_s] * len(shear_part) + \
[omega_h] * len(rest_part)
stencil = get_stencil("D2Q9") if dim == 2 else get_stencil("D3Q27")
all_moments = rho + velocity + shear_part + rest_part
moment_to_rr = OrderedDict((m, rr) for m, rr in zip(all_moments, relaxation_rates))
if maxwellian_moments:
return create_with_continuous_maxwellian_eq_moments(stencil, moment_to_rr, **kwargs)
else:
return create_with_discrete_maxwellian_eq_moments(stencil, moment_to_rr, **kwargs)
def create_mrt_orthogonal(stencil, relaxation_rate_getter=None, maxwellian_moments=False, **kwargs):
r"""
Returns a orthogonal multi-relaxation time model for the stencils D2Q9, D3Q15, D3Q19 and D3Q27.
These MRT methods are just one specific version - there are many MRT methods possible for all these stencils
which differ by the linear combination of moments and the grouping into equal relaxation times.
To create a generic MRT method use `create_with_discrete_maxwellian_eq_moments`
Args:
stencil: nested tuple defining the discrete velocity space. See `get_stencil`
relaxation_rate_getter: function getting a list of moments as argument, returning the associated relaxation
time. The default returns:
- 0 for moments of order 0 and 1 (conserved)
- :math:`\omega`: from moments of order 2 (rate that determines viscosity)
- numbered :math:`\omega_i` for the rest
maxwellian_moments: determines if the discrete or continuous maxwellian equilibrium is
used to compute the equilibrium moments
"""
if relaxation_rate_getter is None:
relaxation_rate_getter = default_relaxation_rate_names()
if isinstance(stencil, str):
stencil = get_stencil(stencil)
x, y, z = MOMENT_SYMBOLS
one = sp.Rational(1, 1)
is_cumulant = 'cumulant' in kwargs and kwargs['cumulant']
moment_to_relaxation_rate_dict = OrderedDict()
if have_same_entries(stencil, get_stencil("D2Q9")):
moments = get_default_moment_set_for_stencil(stencil)
orthogonal_moments = gram_schmidt(moments, stencil)
orthogonal_moments_scaled = [e * common_denominator(e) for e in orthogonal_moments]
nested_moments = list(sort_moments_into_groups_of_same_order(orthogonal_moments_scaled).values())
elif have_same_entries(stencil, get_stencil("D3Q15")):
sq = x ** 2 + y ** 2 + z ** 2
nested_moments = [
[one, x, y, z], # [0, 3, 5, 7]
[sq - 1], # [1]
[3 * sq ** 2 - 6 * sq + 1], # [2]
[(3 * sq - 5) * x, (3 * sq - 5) * y, (3 * sq - 5) * z], # [4, 6, 8]
[3 * x ** 2 - sq, y ** 2 - z ** 2, x * y, y * z, x * z], # [9, 10, 11, 12, 13]
[x * y * z]
]
elif have_same_entries(stencil, get_stencil("D3Q19")):
# This MRT variant mentioned in the dissertation of Ulf Schiller
# "Thermal fluctuations and boundary conditions in the lattice Boltzmann method" (2008), p. 24ff
# There are some typos in the moment matrix on p.27
# The here implemented ordering of the moments is however different from that reference (Eq. 2.61-2.63)
# The moments are weighted-orthogonal (Eq. 2.58)
# Further references:
# Duenweg, B., Schiller, U. D., & Ladd, A. J. (2007). Statistical mechanics of the fluctuating
# lattice Boltzmann equation. Physical Review E, 76(3)
# Chun, B., & Ladd, A. J. (2007). Interpolated boundary condition for lattice Boltzmann simulations of
# flows in narrow gaps. Physical review E, 75(6)
if is_cumulant:
nested_moments = [
[sp.sympify(1), x, y, z], # conserved
[x * y,
x * z,
y * z,
x ** 2 - y ** 2,
x ** 2 - z ** 2], # shear
[x ** 2 + y ** 2 + z ** 2], # bulk
[x * y ** 2 + x * z ** 2,
x ** 2 * y + y * z ** 2,
x ** 2 * z + y ** 2 * z],
[x * y ** 2 - x * z ** 2,
x ** 2 * y - y * z ** 2,
x ** 2 * z - y ** 2 * z],
[x ** 2 * y ** 2 - 2 * x ** 2 * z ** 2 + y ** 2 * z ** 2,
x ** 2 * y ** 2 + x ** 2 * z ** 2 - 2 * y ** 2 * z ** 2],
[x ** 2 * y ** 2 + x ** 2 * z ** 2 + y ** 2 * z ** 2],
]
else:
sq = x ** 2 + y ** 2 + z ** 2
nested_moments = [
[one, x, y, z], # [0, 3, 5, 7]
[sq - 1], # [1]
[3 * sq ** 2 - 6 * sq + 1], # [2]
[(3 * sq - 5) * x, (3 * sq - 5) * y, (3 * sq - 5) * z], # [4, 6, 8]
[3 * x ** 2 - sq, y ** 2 - z ** 2, x * y, y * z, x * z], # [9, 11, 13, 14, 15]
[(2 * sq - 3) * (3 * x ** 2 - sq), (2 * sq - 3) * (y ** 2 - z ** 2)], # [10, 12]
[(y ** 2 - z ** 2) * x, (z ** 2 - x ** 2) * y, (x ** 2 - y ** 2) * z] # [16, 17, 18]
]
elif have_same_entries(stencil, get_stencil("D3Q27")):
if is_cumulant:
nested_moments = [
[sp.sympify(1), x, y, z], # conserved
[x * y,
x * z,
y * z,
x ** 2 - y ** 2,
x ** 2 - z ** 2], # shear
[x ** 2 + y ** 2 + z ** 2], # bulk
[x * y ** 2 + x * z ** 2,
x ** 2 * y + y * z ** 2,
x ** 2 * z + y ** 2 * z],
[x * y ** 2 - x * z ** 2,
x ** 2 * y - y * z ** 2,
x ** 2 * z - y ** 2 * z],
[x * y * z],
[x ** 2 * y ** 2 - 2 * x ** 2 * z ** 2 + y ** 2 * z ** 2,
x ** 2 * y ** 2 + x ** 2 * z ** 2 - 2 * y ** 2 * z ** 2],
[x ** 2 * y ** 2 + x ** 2 * z ** 2 + y ** 2 * z ** 2],
[x ** 2 * y * z,
x * y ** 2 * z,
x * y * z ** 2],
[x ** 2 * y ** 2 * z,
x ** 2 * y * z ** 2,
x * y ** 2 * z ** 2],
[x ** 2 * y ** 2 * z ** 2],
]
else:
xsq, ysq, zsq = x ** 2, y ** 2, z ** 2
all_moments = [
sp.Rational(1, 1), # 0
x, y, z, # 1, 2, 3
x * y, x * z, y * z, # 4, 5, 6
xsq - ysq, # 7
(xsq + ysq + zsq) - 3 * zsq, # 8
(xsq + ysq + zsq) - 2, # 9
3 * (x * ysq + x * zsq) - 4 * x, # 10
3 * (xsq * y + y * zsq) - 4 * y, # 11
3 * (xsq * z + ysq * z) - 4 * z, # 12
x * ysq - x * zsq, # 13
xsq * y - y * zsq, # 14
xsq * z - ysq * z, # 15
x * y * z, # 16
3 * (xsq * ysq + xsq * zsq + ysq * zsq) - 4 * (xsq + ysq + zsq) + 4, # 17
3 * (xsq * ysq + xsq * zsq - 2 * ysq * zsq) - 2 * (2 * xsq - ysq - zsq), # 18
3 * (xsq * ysq - xsq * zsq) - 2 * (ysq - zsq), # 19
3 * (xsq * y * z) - 2 * (y * z), # 20
3 * (x * ysq * z) - 2 * (x * z), # 21
3 * (x * y * zsq) - 2 * (x * y), # 22
9 * (x * ysq * zsq) - 6 * (x * ysq + x * zsq) + 4 * x, # 23
9 * (xsq * y * zsq) - 6 * (xsq * y + y * zsq) + 4 * y, # 24
9 * (xsq * ysq * z) - 6 * (xsq * z + ysq * z) + 4 * z, # 25
27 * (xsq * ysq * zsq) - 18 * (xsq * ysq + xsq * zsq + ysq * zsq) + 12 * (xsq + ysq + zsq) - 8, # 26
]
nested_moments = list(sort_moments_into_groups_of_same_order(all_moments).values())
else:
raise NotImplementedError("No MRT model is available (yet) for this stencil. "
"Create a custom MRT using 'create_with_discrete_maxwellian_eq_moments'")
for moment_list in nested_moments:
rr = relaxation_rate_getter(moment_list)
for m in moment_list:
moment_to_relaxation_rate_dict[m] = rr
if maxwellian_moments:
return create_with_continuous_maxwellian_eq_moments(stencil, moment_to_relaxation_rate_dict, **kwargs)
else:
return create_with_discrete_maxwellian_eq_moments(stencil, moment_to_relaxation_rate_dict, **kwargs)
# ----------------------------------------- Comparison view for notebooks ----------------------------------------------
def compare_moment_based_lb_methods(reference, other, show_deviations_only=False):
import ipy_table
table = []
caption_rows = [len(table)]
table.append(['Shared Moment', 'ref', 'other', 'difference'])
reference_moments = set(reference.moments)
other_moments = set(other.moments)
for moment in reference_moments.intersection(other_moments):
reference_value = reference.relaxation_info_dict[moment].equilibrium_value
other_value = other.relaxation_info_dict[moment].equilibrium_value
diff = sp.simplify(reference_value - other_value)
if show_deviations_only and diff == 0:
pass
else:
table.append(["$%s$" % (sp.latex(moment), ),
"$%s$" % (sp.latex(reference_value), ),
"$%s$" % (sp.latex(other_value), ),
"$%s$" % (sp.latex(diff),)])
only_in_ref = reference_moments - other_moments
if only_in_ref:
caption_rows.append(len(table))
table.append(['Only in Ref', 'value', '', ''])
for moment in only_in_ref:
val = reference.relaxation_info_dict[moment].equilibrium_value
table.append(["$%s$" % (sp.latex(moment),),
"$%s$" % (sp.latex(val),),
" ", " "])
only_in_other = other_moments - reference_moments
if only_in_other:
caption_rows.append(len(table))
table.append(['Only in Other', '', 'value', ''])
for moment in only_in_other:
val = other.relaxation_info_dict[moment].equilibrium_value
table.append(["$%s$" % (sp.latex(moment),),
" ",
"$%s$" % (sp.latex(val),),
" "])
table_display = ipy_table.make_table(table)
for row_idx in caption_rows:
for col in range(4):
ipy_table.set_cell_style(row_idx, col, color='#bbbbbb')
return table_display
from collections import OrderedDict
import sympy as sp
from lbmpy.cumulants import cumulant_as_function_of_raw_moments, raw_moment_as_function_of_cumulants
from lbmpy.methods.abstractlbmethod import AbstractLbMethod, LbmCollisionRule, RelaxationInfo
from lbmpy.methods.conservedquantitycomputation import AbstractConservedQuantityComputation
from lbmpy.moments import (
MOMENT_SYMBOLS, extract_monomials, moment_matrix, monomial_to_polynomial_transformation_matrix)
from pystencils import Assignment
from pystencils.sympyextensions import fast_subs, subs_additive
class CumulantBasedLbMethod(AbstractLbMethod):
def __init__(self, stencil, cumulant_to_relaxation_info_dict, conserved_quantity_computation, force_model=None):
assert isinstance(conserved_quantity_computation, AbstractConservedQuantityComputation)
super(CumulantBasedLbMethod, self).__init__(stencil)
self._force_model = force_model
self._cumulant_to_relaxation_info_dict = OrderedDict(cumulant_to_relaxation_info_dict.items())
self._conserved_quantity_computation = conserved_quantity_computation
self._weights = None
@property
def force_model(self):
return self._force_model
@property
def relaxation_info_dict(self):
return self._cumulant_to_relaxation_info_dict
@property
def zeroth_order_equilibrium_moment_symbol(self, ):
return self._conserved_quantity_computation.zeroth_order_moment_symbol
@property
def first_order_equilibrium_moment_symbols(self, ):
return self._conserved_quantity_computation.first_order_moment_symbols
def set_first_moment_relaxation_rate(self, relaxation_rate):
for e in MOMENT_SYMBOLS[:self.dim]:
assert e in self._cumulant_to_relaxation_info_dict, \
"First cumulants are not relaxed separately by this method"
for e in MOMENT_SYMBOLS[:self.dim]:
prev_entry = self._cumulant_to_relaxation_info_dict[e]
new_entry = RelaxationInfo(prev_entry[0], relaxation_rate)
self._cumulant_to_relaxation_info_dict[e] = new_entry
@property
def cumulants(self):
return tuple(self._cumulant_to_relaxation_info_dict.keys())
@property
def cumulant_equilibrium_values(self):
return tuple([e.equilibrium_value for e in self._cumulant_to_relaxation_info_dict.values()])
@property
def relaxation_rates(self):
return tuple([e.relaxation_rate for e in self._cumulant_to_relaxation_info_dict.values()])
@property
def conserved_quantity_computation(self):
return self._conserved_quantity_computation
@property
def weights(self):
if self._weights is None:
self._weights = self._compute_weights()
return self._weights
def override_weights(self, weights):
assert len(weights) == len(self.stencil)
self._weights = weights
def _repr_html_(self):
table = """
<table style="border:none; width: 100%">
<tr {nb}>
<th {nb} >Cumulant</th>
<th {nb} >Eq. Value </th>
<th {nb} >Relaxation Rate</th>
</tr>
{content}
</table>
"""
content = ""
for cumulant, (eq_value, rr) in self._cumulant_to_relaxation_info_dict.items():
vals = {
'rr': sp.latex(rr),
'cumulant': sp.latex(cumulant),
'eq_value': sp.latex(eq_value),
'nb': 'style="border:none"',
}
content += """<tr {nb}>
<td {nb}>${cumulant}$</td>
<td {nb}>${eq_value}$</td>
<td {nb}>${rr}$</td>
</tr>\n""".format(**vals)
return table.format(content=content, nb='style="border:none"')
def get_equilibrium(self, conserved_quantity_equations=None):
d = sp.eye(len(self.relaxation_rates))
return self._get_collision_rule_with_relaxation_matrix(d, conserved_quantity_equations,
False, False, False, False)
def get_equilibrium_terms(self):
equilibrium = self.get_equilibrium()
return sp.Matrix([eq.rhs for eq in equilibrium.main_assignments])
def get_collision_rule(self, conserved_quantity_equations=None, moment_subexpressions=False,
pre_collision_subexpressions=True, post_collision_subexpressions=False):
return self._get_collision_rule_with_relaxation_matrix(sp.diag(*self.relaxation_rates),
conserved_quantity_equations,
moment_subexpressions, pre_collision_subexpressions,
post_collision_subexpressions)
def _compute_weights(self):
replacements = self._conserved_quantity_computation.default_values
eq = self.get_equilibrium()
ac = eq.new_with_substitutions(replacements, substitute_on_lhs=False).new_without_subexpressions()
new_main_eqs = [Assignment(e.lhs,
subs_additive(e.rhs, sp.sympify(1), sum(self.pre_collision_pdf_symbols),
required_match_replacement=1.0))
for e in ac.main_assignments]
ac = ac.copy(new_main_eqs)
weights = []
for eq in ac.main_assignments:
value = eq.rhs.expand()
assert len(value.atoms(sp.Symbol)) == 0, "Failed to compute weights"
weights.append(value)
return weights
def _get_collision_rule_with_relaxation_matrix(self, relaxation_matrix, conserved_quantity_equations=None,
moment_subexpressions=False, pre_collision_subexpressions=True,
post_collision_subexpressions=False, include_force_terms=True):
def tuple_to_symbol(exp, prefix):
dim = len(exp)
format_string = prefix + "_" + "_".join(["%d"] * dim)
return sp.Symbol(format_string % exp)
def substitute_conserved_quantities(expressions, cqe):
cqe = cqe.new_without_subexpressions()
substitution_dict = {eq.rhs: eq.lhs for eq in cqe.main_assignments}
density = cqe.main_assignments[0].lhs
substitution_dict.update({density * eq.rhs: density * eq.lhs for eq in cqe.main_assignments[1:]})
return [fast_subs(e, substitution_dict) for e in expressions]
f = self.pre_collision_pdf_symbols
if conserved_quantity_equations is None:
conserved_quantity_equations = self._conserved_quantity_computation.equilibrium_input_equations_from_pdfs(f)
subexpressions = conserved_quantity_equations.all_assignments
# 1) Determine monomial indices, and arrange them such that the zeroth and first order indices come first
indices = list(extract_monomials(self.cumulants, dim=len(self.stencil[0])))
zeroth_moment_exponent = (0,) * self.dim
first_moment_exponents = [tuple([1 if i == j else 0 for i in range(self.dim)]) for j in range(self.dim)]
lower_order_indices = [zeroth_moment_exponent] + first_moment_exponents
num_lower_order_indices = len(lower_order_indices)
assert all(e in indices for e in lower_order_indices), \
"Cumulant system does not contain relaxation rules for zeroth and first order cumulants"
higher_order_indices = [e for e in indices if e not in lower_order_indices]
indices = lower_order_indices + higher_order_indices # reorder
# 2) Transform pdfs to moments
moment_transformation_matrix = moment_matrix(indices, self.stencil)
moments = moment_transformation_matrix * sp.Matrix(f)
moments = substitute_conserved_quantities(moments, conserved_quantity_equations)
if moment_subexpressions:
symbols = [tuple_to_symbol(t, "m") for t in higher_order_indices]
subexpressions += [Assignment(sym, moment)
for sym, moment in zip(symbols, moments[num_lower_order_indices:])]
moments = moments[:num_lower_order_indices] + symbols
# 3) Transform moments to monomial cumulants
moments_dict = {idx: m for idx, m in zip(indices, moments)}
monomial_cumulants = [cumulant_as_function_of_raw_moments(idx, moments_dict) for idx in indices]
if pre_collision_subexpressions:
symbols = [tuple_to_symbol(t, "pre_c") for t in higher_order_indices]
subexpressions += [Assignment(sym, c)
for sym, c in zip(symbols, monomial_cumulants[num_lower_order_indices:])]
monomial_cumulants = monomial_cumulants[:num_lower_order_indices] + symbols
# 4) Transform monomial to polynomial cumulants which are then relaxed and transformed back
mon_to_poly = monomial_to_polynomial_transformation_matrix(indices, self.cumulants)
poly_values = mon_to_poly * sp.Matrix(monomial_cumulants)
eq_values = sp.Matrix(self.cumulant_equilibrium_values)
collided_poly_values = poly_values + relaxation_matrix * (eq_values - poly_values) # collision
relaxed_monomial_cumulants = mon_to_poly.inv() * collided_poly_values
if post_collision_subexpressions:
symbols = [tuple_to_symbol(t, "post_c") for t in higher_order_indices]
subexpressions += [Assignment(sym, c)
for sym, c in zip(symbols, relaxed_monomial_cumulants[num_lower_order_indices:])]
relaxed_monomial_cumulants = relaxed_monomial_cumulants[:num_lower_order_indices] + symbols
# 5) Transform post-collision cumulant back to moments and from there to pdfs
cumulant_dict = {idx: value for idx, value in zip(indices, relaxed_monomial_cumulants)}
collided_moments = [raw_moment_as_function_of_cumulants(idx, cumulant_dict) for idx in indices]
result = moment_transformation_matrix.inv() * sp.Matrix(collided_moments)
main_assignments = [Assignment(sym, val) for sym, val in zip(self.post_collision_pdf_symbols, result)]
# 6) Add forcing terms
if self._force_model is not None and include_force_terms:
force_model_terms = self._force_model(self)
force_term_symbols = sp.symbols("forceTerm_:%d" % (len(force_model_terms,)))
force_subexpressions = [Assignment(sym, force_model_term)
for sym, force_model_term in zip(force_term_symbols, force_model_terms)]
subexpressions += force_subexpressions
main_assignments = [Assignment(eq.lhs, eq.rhs + force_term_symbol)
for eq, force_term_symbol in zip(main_assignments, force_term_symbols)]
sh = {'relaxation_rates': list(self.relaxation_rates)}
return LbmCollisionRule(self, main_assignments, subexpressions, simplification_hints=sh)
import sympy as sp
from lbmpy.maxwellian_equilibrium import get_weights
from lbmpy.methods.abstractlbmethod import AbstractLbMethod, LbmCollisionRule
from lbmpy.methods.conservedquantitycomputation import DensityVelocityComputation
from pystencils import Assignment
class EntropicEquilibriumSRT(AbstractLbMethod):
"""Equilibrium from 'Minimal entropic kinetic models for hydrodynamics'
Ansumali, S. ; Karlin, I. V; Öttinger, H. C, (2003)
"""
def __init__(self, stencil, relaxation_rate, force_model, conserved_quantity_calculation):
super(EntropicEquilibriumSRT, self).__init__(stencil)
self._cqc = conserved_quantity_calculation
self._weights = get_weights(stencil, c_s_sq=sp.Rational(1, 3))
self._relaxationRate = relaxation_rate
self._forceModel = force_model
self.shear_relaxation_rate = relaxation_rate
@property
def conserved_quantity_computation(self):
return self._cqc
@property
def weights(self):
return self._weights
@property
def zeroth_order_equilibrium_moment_symbol(self, ):
return self._cqc.zeroth_order_moment_symbol
@property
def first_order_equilibrium_moment_symbols(self, ):
return self._cqc.first_order_moment_symbols
def get_equilibrium(self, conserved_quantity_equations=None, include_force_terms=False):
return self._get_collision_rule_with_relaxation_rate(1,
conserved_quantity_equations=conserved_quantity_equations,
include_force_terms=include_force_terms)
def get_equilibrium_terms(self):
equilibrium = self.get_equilibrium()
return sp.Matrix([eq.rhs for eq in equilibrium.main_assignments])
def _get_collision_rule_with_relaxation_rate(self, relaxation_rate, include_force_terms=True,
conserved_quantity_equations=None):
f = sp.Matrix(self.pre_collision_pdf_symbols)
rho = self._cqc.zeroth_order_moment_symbol
u = self._cqc.first_order_moment_symbols
if conserved_quantity_equations is None:
conserved_quantity_equations = self._cqc.equilibrium_input_equations_from_pdfs(f)
all_subexpressions = conserved_quantity_equations.all_assignments
eq = []
for w_i, direction in zip(self.weights, self.stencil):
f_i = rho * w_i
for u_a, e_ia in zip(u, direction):
b = sp.sqrt(1 + 3 * u_a ** 2)
f_i *= (2 - b) * ((2 * u_a + b) / (1 - u_a)) ** e_ia
eq.append(f_i)
collision_eqs = [Assignment(lhs, (1 - relaxation_rate) * f_i + relaxation_rate * eq_i)
for lhs, f_i, eq_i in zip(self.post_collision_pdf_symbols, self.pre_collision_pdf_symbols, eq)]
if (self._forceModel is not None) and include_force_terms:
force_model_terms = self._forceModel(self)
force_term_symbols = sp.symbols("forceTerm_:%d" % (len(force_model_terms, )))
force_subexpressions = [Assignment(sym, force_model_term)
for sym, force_model_term in zip(force_term_symbols, force_model_terms)]
all_subexpressions += force_subexpressions
collision_eqs = [Assignment(eq.lhs, eq.rhs + force_term_symbol)
for eq, force_term_symbol in zip(collision_eqs, force_term_symbols)]
cr = LbmCollisionRule(self, collision_eqs, all_subexpressions)
cr.simplification_hints['relaxation_rates'] = []
return cr
def get_collision_rule(self, conserved_quantity_equations=None):
return self._get_collision_rule_with_relaxation_rate(self._relaxationRate,
conserved_quantity_equations=conserved_quantity_equations)
def create_srt_entropic(stencil, relaxation_rate, force_model, compressible):
if not compressible:
raise NotImplementedError("entropic-srt only implemented for compressible models")
density_velocity_computation = DensityVelocityComputation(stencil, compressible, force_model)
return EntropicEquilibriumSRT(stencil, relaxation_rate, force_model, density_velocity_computation)
from collections import OrderedDict
import sympy as sp
from lbmpy.methods.abstractlbmethod import AbstractLbMethod, LbmCollisionRule, RelaxationInfo
from lbmpy.methods.conservedquantitycomputation import AbstractConservedQuantityComputation
from lbmpy.moments import MOMENT_SYMBOLS, moment_matrix
from pystencils import Assignment
from pystencils.sympyextensions import subs_additive
class MomentBasedLbMethod(AbstractLbMethod):
def __init__(self, stencil, moment_to_relaxation_info_dict, conserved_quantity_computation=None, force_model=None):
"""
Moment based LBM is a class to represent the single (SRT), two (TRT) and multi relaxation time (MRT) methods.
These methods work by transforming the pdfs into moment space using a linear transformation. In the moment
space each component (moment) is relaxed to an equilibrium moment by a certain relaxation rate. These
equilibrium moments can e.g. be determined by taking the equilibrium moments of the continuous Maxwellian.
Args:
stencil: see :func:`lbmpy.stencils.get_stencil`
moment_to_relaxation_info_dict: a dictionary mapping moments in either tuple or polynomial formulation
to a RelaxationInfo, which consists of the corresponding equilibrium moment
and a relaxation rate
conserved_quantity_computation: instance of :class:`lbmpy.methods.AbstractConservedQuantityComputation`.
This determines how conserved quantities are computed, and defines
the symbols used in the equilibrium moments like e.g. density and velocity
force_model: force model instance, or None if no forcing terms are required
"""
assert isinstance(conserved_quantity_computation, AbstractConservedQuantityComputation)
super(MomentBasedLbMethod, self).__init__(stencil)
self._forceModel = force_model
self._momentToRelaxationInfoDict = OrderedDict(moment_to_relaxation_info_dict.items())
self._conservedQuantityComputation = conserved_quantity_computation
self._weights = None
@property
def force_model(self):
return self._forceModel
@property
def relaxation_info_dict(self):
return self._momentToRelaxationInfoDict
@property
def conserved_quantity_computation(self):
return self._conservedQuantityComputation
@property
def moments(self):
return tuple(self._momentToRelaxationInfoDict.keys())
@property
def moment_equilibrium_values(self):
return tuple([e.equilibrium_value for e in self._momentToRelaxationInfoDict.values()])
@property
def relaxation_rates(self):
return tuple([e.relaxation_rate for e in self._momentToRelaxationInfoDict.values()])
@property
def zeroth_order_equilibrium_moment_symbol(self, ):
return self._conservedQuantityComputation.zeroth_order_moment_symbol
@property
def first_order_equilibrium_moment_symbols(self, ):
return self._conservedQuantityComputation.first_order_moment_symbols
@property
def weights(self):
if self._weights is None:
self._weights = self._compute_weights()
return self._weights
def override_weights(self, weights):
assert len(weights) == len(self.stencil)
self._weights = weights
def get_equilibrium(self, conserved_quantity_equations=None, include_force_terms=False):
relaxation_matrix = sp.eye(len(self.relaxation_rates))
return self._collision_rule_with_relaxation_matrix(relaxation_matrix,
conserved_quantity_equations=conserved_quantity_equations,
include_force_terms=include_force_terms)
def get_equilibrium_terms(self):
equilibrium = self.get_equilibrium()
return sp.Matrix([eq.rhs for eq in equilibrium.main_assignments])
def get_collision_rule(self, conserved_quantity_equations=None):
d = sp.diag(*self.relaxation_rates)
relaxation_rate_sub_expressions, d = self._generate_relaxation_matrix(d)
ac = self._collision_rule_with_relaxation_matrix(d, relaxation_rate_sub_expressions,
True, conserved_quantity_equations)
return ac
def set_zeroth_moment_relaxation_rate(self, relaxation_rate):
e = sp.Rational(1, 1)
prev_entry = self._momentToRelaxationInfoDict[e]
new_entry = RelaxationInfo(prev_entry[0], relaxation_rate)
self._momentToRelaxationInfoDict[e] = new_entry
def set_first_moment_relaxation_rate(self, relaxation_rate):
for e in MOMENT_SYMBOLS[:self.dim]:
assert e in self._momentToRelaxationInfoDict, "First moments are not relaxed separately by this method"
for e in MOMENT_SYMBOLS[:self.dim]:
prev_entry = self._momentToRelaxationInfoDict[e]
new_entry = RelaxationInfo(prev_entry[0], relaxation_rate)
self._momentToRelaxationInfoDict[e] = new_entry
@property
def collision_matrix(self):
pdfs_to_moments = self.moment_matrix
relaxation_matrix = sp.diag(*self.relaxation_rates)
return pdfs_to_moments.inv() * relaxation_matrix * pdfs_to_moments
@property
def inverse_collision_matrix(self):
pdfs_to_moments = self.moment_matrix
inverse_relaxation_matrix = sp.diag(*[1 / e for e in self.relaxation_rates])
return pdfs_to_moments.inv() * inverse_relaxation_matrix * pdfs_to_moments
@property
def moment_matrix(self):
return moment_matrix(self.moments, self.stencil)
def __getstate__(self):
# Workaround for a bug in joblib
self._momentToRelaxationInfoDictToPickle = [i for i in self._momentToRelaxationInfoDict.items()]
return self.__dict__
def _repr_html_(self):
table = """
<table style="border:none; width: 100%">
<tr {nb}>
<th {nb} >Moment</th>
<th {nb} >Eq. Value </th>
<th {nb} >Relaxation Rate</th>
</tr>
{content}
</table>
"""
content = ""
for moment, (eq_value, rr) in self._momentToRelaxationInfoDict.items():
vals = {
'rr': sp.latex(rr),
'moment': sp.latex(moment),
'eq_value': sp.latex(eq_value),
'nb': 'style="border:none"',
}
content += """<tr {nb}>
<td {nb}>${moment}$</td>
<td {nb}>${eq_value}$</td>
<td {nb}>${rr}$</td>
</tr>\n""".format(**vals)
return table.format(content=content, nb='style="border:none"')
def _compute_weights(self):
replacements = self._conservedQuantityComputation.default_values
ac = self.get_equilibrium(include_force_terms=False)
ac = ac.new_with_substitutions(replacements, substitute_on_lhs=False).new_without_subexpressions()
new_assignments = [Assignment(e.lhs,
subs_additive(e.rhs, sp.sympify(1), sum(self.pre_collision_pdf_symbols),
required_match_replacement=1.0))
for e in ac.main_assignments]
ac = ac.copy(new_assignments)
weights = []
for eq in ac.main_assignments:
value = eq.rhs.expand()
assert len(value.atoms(sp.Symbol)) == 0, "Failed to compute weights " + str(value)
weights.append(value)
return weights
def _collision_rule_with_relaxation_matrix(self, d, additional_subexpressions=(), include_force_terms=True,
conserved_quantity_equations=None):
f = sp.Matrix(self.pre_collision_pdf_symbols)
pdf_to_moment = self.moment_matrix
m_eq = sp.Matrix(self.moment_equilibrium_values)
collision_rule = f + pdf_to_moment.inv() * d * (m_eq - pdf_to_moment * f)
collision_eqs = [Assignment(lhs, rhs) for lhs, rhs in zip(self.post_collision_pdf_symbols, collision_rule)]
if conserved_quantity_equations is None:
conserved_quantity_equations = self._conservedQuantityComputation.equilibrium_input_equations_from_pdfs(f)
simplification_hints = conserved_quantity_equations.simplification_hints.copy()
simplification_hints.update(self._conservedQuantityComputation.defined_symbols())
simplification_hints['relaxation_rates'] = [d[i, i] for i in range(d.rows)]
all_subexpressions = list(additional_subexpressions) + conserved_quantity_equations.all_assignments
if self._forceModel is not None and include_force_terms:
force_model_terms = self._forceModel(self)
force_term_symbols = sp.symbols("forceTerm_:%d" % (len(force_model_terms,)))
force_subexpressions = [Assignment(sym, force_model_term)
for sym, force_model_term in zip(force_term_symbols, force_model_terms)]
all_subexpressions += force_subexpressions
collision_eqs = [Assignment(eq.lhs, eq.rhs + force_term_symbol)
for eq, force_term_symbol in zip(collision_eqs, force_term_symbols)]
simplification_hints['force_terms'] = force_term_symbols
return LbmCollisionRule(self, collision_eqs, all_subexpressions,
simplification_hints)
@staticmethod
def _generate_relaxation_matrix(relaxation_matrix):
"""
For SRT and TRT the equations can be easier simplified if the relaxation times are symbols, not numbers.
This function replaces the numbers in the relaxation matrix with symbols in this case, and returns also
the subexpressions, that assign the number to the newly introduced symbol
"""
rr = [relaxation_matrix[i, i] for i in range(relaxation_matrix.rows)]
unique_relaxation_rates = set(rr)
if len(unique_relaxation_rates) <= 2:
# special handling for SRT and TRT
subexpressions = {}
for rt in unique_relaxation_rates:
rt = sp.sympify(rt)
if not isinstance(rt, sp.Symbol):
rt_symbol = sp.Symbol("rr_%d" % (len(subexpressions),))
subexpressions[rt] = rt_symbol
new_rr = [subexpressions[sp.sympify(e)] if sp.sympify(e) in subexpressions else e
for e in rr]
substitutions = [Assignment(e[1], e[0]) for e in subexpressions.items()]
return substitutions, sp.diag(*new_rr)
else:
return [], relaxation_matrix
def analytic_rising_speed(gravitational_acceleration, bubble_diameter, viscosity_gas):
r"""
Calculated the analytical rising speed of a bubble. This is the expected end rising speed.
Args:
gravitational_acceleration: the gravitational acceleration acting in the simulation scenario. Usually it gets
calculated based on dimensionless parameters which describe the scenario
bubble_diameter: the diameter of the bubble at the beginning of the simulation
viscosity_gas: the viscosity of the fluid inside the bubble
"""
result = -(gravitational_acceleration * bubble_diameter * bubble_diameter) / (12.0 * viscosity_gas)
return result
def laplacian(phi_field):
r"""
Get a symbolic expression for the laplacian.
Args:
phi_field: the phase-field on which the laplacian is applied
"""
lp_phi = 16.0 * ((phi_field[1, 0, 0]) + (phi_field[-1, 0, 0])
+ (phi_field[0, 1, 0]) + (phi_field[0, -1, 0])
+ (phi_field[0, 0, 1]) + (phi_field[0, 0, -1])) \
+ 1.0 * (
(phi_field[1, 1, 1]) + (phi_field[-1, 1, 1])
+ (phi_field[1, -1, 1]) + (phi_field[-1, -1, 1])
+ (phi_field[1, 1, -1]) + (phi_field[-1, 1, -1])
+ (phi_field[1, -1, -1]) + (phi_field[-1, -1, -1])) \
+ 4.0 * (
(phi_field[1, 1, 0]) + (phi_field[-1, 1, 0])
+ (phi_field[1, -1, 0]) + (phi_field[-1, -1, 0])
+ (phi_field[1, 0, 1]) + (phi_field[-1, 0, 1])
+ (phi_field[1, 0, -1]) + (phi_field[-1, 0, -1])
+ (phi_field[0, 1, 1]) + (phi_field[0, -1, 1])
+ (phi_field[0, 1, -1]) + (phi_field[0, -1, -1])) \
- 152.0 * phi_field[0, 0, 0]
lp_phi = lp_phi / 36
return lp_phi
def isotropic_gradient(phi_field):
r"""
Get a symbolic expression for the isotropic gradient of the phase-field
Args:
phi_field: the phase-field on which the isotropic gradient is applied
"""
grad_phi_x = 16.00 * (phi_field[1, 0, 0] - phi_field[-1, 0, 0])\
+ (phi_field[1, 1, 1] - phi_field[-1, 1, 1] + phi_field[1, -1, 1] - phi_field[-1, -1, 1]
+ phi_field[1, 1, -1] - phi_field[-1, 1, -1] + phi_field[1, -1, -1] - phi_field[-1, -1, -1])\
+ 4.00 * (phi_field[1, 1, 0] - phi_field[-1, 1, 0] + phi_field[1, -1, 0] - phi_field[-1, -1, 0]
+ phi_field[1, 0, 1] - phi_field[-1, 0, 1] + phi_field[1, 0, -1] - phi_field[-1, 0, -1])
grad_phi_y = 16.00 * (phi_field[0, 1, 0] - phi_field[0, -1, 0]) \
+ (phi_field[1, 1, 1] + phi_field[-1, 1, 1] - phi_field[1, -1, 1] - phi_field[-1, -1, 1]
+ phi_field[1, 1, -1] + phi_field[-1, 1, -1] - phi_field[1, -1, -1] - phi_field[-1, -1, -1])\
+ 4.00 * (phi_field[1, 1, 0] + phi_field[-1, 1, 0] - phi_field[1, -1, 0] - phi_field[-1, -1, 0]
+ phi_field[0, 1, 1] - phi_field[0, -1, 1] + phi_field[0, 1, -1] - phi_field[0, -1, -1])
grad_phi_z = 16.00 * (phi_field[0, 0, 1] - phi_field[0, 0, -1]) \
+ (phi_field[1, 1, 1] + phi_field[-1, 1, 1] + phi_field[1, -1, 1] + phi_field[-1, -1, 1]
- phi_field[1, 1, -1] - phi_field[-1, 1, -1] - phi_field[1, -1, -1] - phi_field[-1, -1, -1])\
+ 4.00 * (phi_field[1, 0, 1] + phi_field[-1, 0, 1] - phi_field[1, 0, -1] - phi_field[-1, 0, -1]
+ phi_field[0, 1, 1] + phi_field[0, -1, 1] - phi_field[0, 1, -1] - phi_field[0, -1, -1])
grad = [grad_phi_x / 72, grad_phi_y / 72, grad_phi_z / 72]
return grad
import sympy as sp
import numpy as np
from lbmpy.forcemodels import Simple
class MultiphaseForceModel:
r"""
A force model based on PhysRevE.96.053301. This model realises the modified equilibrium distributions meaning the
force gets shifted by minus one half multiplied with the collision operator
"""
def __init__(self, force, rho=1):
self._force = force
self._rho = rho
def __call__(self, lb_method):
stencil = lb_method.stencil
force_symp = sp.symbols("force_:{}".format(lb_method.dim))
simple = Simple(force_symp)
force = [f / self._rho for f in simple(lb_method)]
moment_matrix = lb_method.moment_matrix
relaxation_rates = sp.Matrix(np.diag(lb_method.relaxation_rates))
mrt_collision_op = moment_matrix.inv() * relaxation_rates * moment_matrix
result = -0.5 * mrt_collision_op * sp.Matrix(force) + sp.Matrix(force)
for i in range(0, len(stencil)):
result[i] = result[i].simplify()
subs_dict = dict(zip(force_symp, self._force))
for i in range(0, len(stencil)):
result[i] = result[i].subs(subs_dict)
return result
from pystencils.field import Field
from lbmpy.creationfunctions import update_with_default_parameters
from lbmpy.fieldaccess import StreamPushTwoFieldsAccessor, CollideOnlyInplaceAccessor
from pystencils.fd.derivation import FiniteDifferenceStencilDerivation
from lbmpy.maxwellian_equilibrium import get_weights
from pystencils import Assignment, AssignmentCollection
from lbmpy.phasefield_allen_cahn.derivatives import laplacian, isotropic_gradient
import sympy as sp
import numpy as np
def chemical_potential_symbolic(phi_field, stencil, beta, kappa):
r"""
Get a symbolic expression for the chemical potential according to equation (5) in PhysRevE.96.053301.
Args:
phi_field: the phase-field on which the chemical potential is applied
stencil: stencil of the lattice Boltzmann step
beta: coefficient related to surface tension and interface thickness
kappa: coefficient related to surface tension and interface thickness
"""
dimensions = len(stencil[0])
deriv = FiniteDifferenceStencilDerivation((0, 0), stencil)
# assume the stencil is symmetric
deriv.assume_symmetric(0)
deriv.assume_symmetric(1)
if dimensions == 3:
deriv.assume_symmetric(2)
# set weights for missing degrees of freedom in the calculation
if len(stencil) == 9:
deriv.set_weight((1, 1), sp.Rational(1, 6))
deriv.set_weight((0, -1), sp.Rational(2, 3))
deriv.set_weight((0, 0), sp.Rational(-10, 3))
# assume the stencil is isotropic
res = deriv.get_stencil(isotropic=True)
lap = res.apply(phi_field.center)
else:
lap = laplacian(phi_field)
# get the chemical potential
mu = 4.0 * beta * phi_field.center * (phi_field.center - 1.0) * (phi_field.center - 0.5) - \
kappa * lap
return mu
def isotropic_gradient_symbolic(phi_field, stencil):
r"""
Get a symbolic expression for the isotropic gradient of the phase-field
Args:
phi_field: the phase-field on which the isotropic gradient is applied
stencil: stencil of the lattice Boltzmann step
"""
dimensions = len(stencil[0])
deriv = FiniteDifferenceStencilDerivation((0,), stencil)
deriv.assume_symmetric(0, anti_symmetric=True)
deriv.assume_symmetric(1, anti_symmetric=False)
if dimensions == 3:
deriv.assume_symmetric(2, anti_symmetric=False)
if len(stencil) == 9:
res = deriv.get_stencil(isotropic=True)
grad = [res.apply(phi_field.center), res.rotate_weights_and_apply(phi_field.center, (0, 1)), 0]
elif len(stencil) == 19:
deriv.set_weight((0, 0, 0), sp.Integer(0))
deriv.set_weight((1, 0, 0), sp.Rational(1, 6))
deriv.set_weight((1, 1, 0), sp.Rational(1, 12))
res = deriv.get_stencil(isotropic=True)
grad = [res.apply(phi_field.center),
res.rotate_weights_and_apply(phi_field.center, (1, 0)),
res.rotate_weights_and_apply(phi_field.center, (2, 1))]
else:
grad = isotropic_gradient(phi_field)
return grad
def normalized_isotropic_gradient_symbolic(phi_field, stencil):
r"""
Get a symbolic expression for the normalized isotropic gradient of the phase-field
Args:
phi_field: the phase-field on which the normalized isotropic gradient is applied
stencil: stencil of the lattice Boltzmann step
"""
iso_grad = isotropic_gradient_symbolic(phi_field, stencil)
tmp = (sum(map(lambda x: x * x, isotropic_gradient_symbolic(phi_field, stencil))) + 1.e-32) ** 0.5
result = [x / tmp for x in iso_grad]
return result
def pressure_force(phi_field, stencil, density_heavy, density_light):
r"""
Get a symbolic expression for the pressure force
Args:
phi_field: phase-field
stencil: stencil of the lattice Boltzmann step
density_heavy: density of the heavier fluid
density_light: density of the lighter fluid
"""
iso_grad = isotropic_gradient_symbolic(phi_field, stencil)
result = list(map(lambda x: sp.Rational(-1, 3) * sp.symbols("rho") * (density_heavy - density_light) * x, iso_grad))
return result
def viscous_force(lb_velocity_field, phi_field, mrt_method, tau, density_heavy, density_light):
r"""
Get a symbolic expression for the viscous force
Args:
lb_velocity_field: hydrodynamic distribution function
phi_field: phase-field
mrt_method: mrt lattice boltzmann method used for hydrodynamics
tau: relaxation time of the hydrodynamic lattice boltzmann step
density_heavy: density of the heavier fluid
density_light: density of the lighter fluid
"""
stencil = mrt_method.stencil
dimensions = len(stencil[0])
iso_grad = isotropic_gradient_symbolic(phi_field, stencil)
moment_matrix = mrt_method.moment_matrix
rel = mrt_method.relaxation_rates
eq = mrt_method.moment_equilibrium_values
eq = np.array(eq)
g_vals = [lb_velocity_field.center(i) for i, _ in enumerate(stencil)]
m0 = np.dot(moment_matrix, g_vals)
m = m0 - eq
m = m * rel
non_equilibrium = np.dot(moment_matrix.inv(), m)
stress_tensor = [0] * 6
# Calculate Stress Tensor MRT
for i, d in enumerate(stencil):
stress_tensor[0] = sp.Add(stress_tensor[0], non_equilibrium[i] * (d[0] * d[0]))
stress_tensor[1] = sp.Add(stress_tensor[1], non_equilibrium[i] * (d[1] * d[1]))
if dimensions == 3:
stress_tensor[2] = sp.Add(stress_tensor[2], non_equilibrium[i] * (d[2] * d[2]))
stress_tensor[3] = sp.Add(stress_tensor[3], non_equilibrium[i] * (d[1] * d[2]))
stress_tensor[4] = sp.Add(stress_tensor[4], non_equilibrium[i] * (d[0] * d[2]))
stress_tensor[5] = sp.Add(stress_tensor[5], non_equilibrium[i] * (d[0] * d[1]))
density_difference = density_heavy - density_light
# Calculate Viscous Force MRT
fmx = (0.5 - tau) * (stress_tensor[0] * iso_grad[0]
+ stress_tensor[5] * iso_grad[1]
+ stress_tensor[4] * iso_grad[2]) * density_difference
fmy = (0.5 - tau) * (stress_tensor[5] * iso_grad[0]
+ stress_tensor[1] * iso_grad[1]
+ stress_tensor[3] * iso_grad[2]) * density_difference
fmz = (0.5 - tau) * (stress_tensor[4] * iso_grad[0]
+ stress_tensor[3] * iso_grad[1]
+ stress_tensor[2] * iso_grad[2]) * density_difference
return [fmx, fmy, fmz]
def surface_tension_force(phi_field, stencil, beta, kappa):
r"""
Get a symbolic expression for the surface tension force
Args:
phi_field: the phase-field on which the chemical potential is applied
stencil: stencil of the lattice Boltzmann step
beta: coefficient related to surface tension and interface thickness
kappa: coefficient related to surface tension and interface thickness
"""
chemical_potential = chemical_potential_symbolic(phi_field, stencil, beta, kappa)
iso_grad = isotropic_gradient_symbolic(phi_field, stencil)
return [chemical_potential * x for x in iso_grad]
def hydrodynamic_force(lb_velocity_field, phi_field, lb_method, tau,
density_heavy, density_light, kappa, beta, body_force):
r"""
Get a symbolic expression for the hydrodynamic force
Args:
lb_velocity_field: hydrodynamic distribution function
phi_field: phase-field
lb_method: Lattice boltzmann method used for hydrodynamics
tau: relaxation time of the hydrodynamic lattice boltzmann step
density_heavy: density of the heavier fluid
density_light: density of the lighter fluid
beta: coefficient related to surface tension and interface thickness
kappa: coefficient related to surface tension and interface thickness
body_force: force acting on the fluids. Usually the gravity
"""
stencil = lb_method.stencil
dimensions = len(stencil[0])
fp = pressure_force(phi_field, stencil, density_heavy, density_light)
fm = viscous_force(lb_velocity_field, phi_field, lb_method, tau, density_heavy, density_light)
fs = surface_tension_force(phi_field, stencil, beta, kappa)
result = []
for i in range(dimensions):
result.append(fs[i] + fp[i] + fm[i] + body_force[i])
return result
def interface_tracking_force(phi_field, stencil, interface_thickness):
r"""
Get a symbolic expression for the hydrodynamic force
Args:
phi_field: phase-field
stencil: stencil of the phase-field distribution lattice Boltzmann step
interface_thickness: interface thickness
"""
dimensions = len(stencil[0])
normal_fd = normalized_isotropic_gradient_symbolic(phi_field, stencil)
result = []
for i in range(dimensions):
result.append(((1.0 - 4.0 * (phi_field.center - 0.5) ** 2.0) / interface_thickness) * normal_fd[i])
return result
def get_update_rules_velocity(src_field, u_in, lb_method, force, density):
r"""
Get assignments to update the velocity with a force shift
Args:
src_field: the source field of the hydrodynamic distribution function
u_in: velocity field
lb_method: mrt lattice boltzmann method used for hydrodynamics
force: force acting on the hydrodynamic lb step
density: the interpolated density of the simulation
"""
stencil = lb_method.stencil
dimensions = len(stencil[0])
moment_matrix = lb_method.moment_matrix
eq = lb_method.moment_equilibrium_values
first_eqs = lb_method.first_order_equilibrium_moment_symbols
indices = list()
for i in range(dimensions):
indices.append(eq.index(first_eqs[i]))
src = [src_field.center(i) for i, _ in enumerate(stencil)]
m0 = np.dot(moment_matrix, src)
update_u = list()
update_u.append(Assignment(sp.symbols("rho"), m0[0]))
u_symp = sp.symbols("u_:{}".format(dimensions))
zw = sp.symbols("zw_:{}".format(dimensions))
for i in range(dimensions):
update_u.append(Assignment(zw[i], u_in.center_vector[i]))
subs_dict = dict(zip(u_symp, zw))
for i in range(dimensions):
update_u.append(Assignment(u_in.center_vector[i], m0[indices[i]] + force[i].subs(subs_dict) / density / 2))
return update_u
def get_collision_assignments_hydro(collision_rule=None, density=1, optimization={}, **kwargs):
r"""
Get collision assignments for the hydrodynamic lattice Boltzmann step. Here the force gets applied in the moment
space. Afterwards the transformation back to the pdf space happens.
Args:
collision_rule: arbitrary collision rule, e.g. created with create_lb_collision_rule
density: the interpolated density of the simulation
optimization: for details see createfunctions.py
"""
params, opt_params = update_with_default_parameters(kwargs, optimization)
lb_method = params['lb_method']
stencil = lb_method.stencil
dimensions = len(stencil[0])
field_data_type = 'float64' if opt_params['double_precision'] else 'float32'
q = len(stencil)
u_in = params['velocity_input']
force = params['force']
if opt_params['symbolic_field'] is not None:
src_field = opt_params['symbolic_field']
elif opt_params['field_size']:
field_size = [s + 2 for s in opt_params['field_size']] + [q]
src_field = Field.create_fixed_size(params['field_name'], field_size, index_dimensions=1,
layout=opt_params['field_layout'], dtype=field_data_type)
else:
src_field = Field.create_generic(params['field_name'], spatial_dimensions=collision_rule.method.dim,
index_shape=(q,), layout=opt_params['field_layout'], dtype=field_data_type)
if opt_params['symbolic_temporary_field'] is not None:
dst_field = opt_params['symbolic_temporary_field']
else:
dst_field = src_field.new_field_with_different_name(params['temporary_field_name'])
moment_matrix = lb_method.moment_matrix
rel = lb_method.relaxation_rates
eq = lb_method.moment_equilibrium_values
first_eqs = lb_method.first_order_equilibrium_moment_symbols
indices = list()
for i in range(dimensions):
indices.append(eq.index(first_eqs[i]))
eq = np.array(eq)
g_vals = [src_field.center(i) for i, _ in enumerate(stencil)]
m0 = np.dot(moment_matrix, g_vals)
mf = np.zeros(len(stencil), dtype=object)
for i in range(dimensions):
mf[indices[i]] = force[i] / density
m = sp.symbols("m_:{}".format(len(stencil)))
update_m = get_update_rules_velocity(src_field, u_in, lb_method, force, density)
u_symp = sp.symbols("u_:{}".format(dimensions))
for i in range(dimensions):
update_m.append(Assignment(u_symp[i], u_in.center_vector[i]))
for i in range(0, len(stencil)):
update_m.append(Assignment(m[i], m0[i] - (m0[i] - eq[i] + mf[i] / 2) * rel[i] + mf[i]))
update_g = list()
var = np.dot(moment_matrix.inv(), m)
if params['kernel_type'] == 'collide_stream_push':
push_accessor = StreamPushTwoFieldsAccessor()
post_collision_accesses = push_accessor.write(dst_field, stencil)
else:
collide_accessor = CollideOnlyInplaceAccessor()
post_collision_accesses = collide_accessor.write(src_field, stencil)
for i in range(0, len(stencil)):
update_g.append(Assignment(post_collision_accesses[i], var[i]))
hydro_lb_update_rule = AssignmentCollection(main_assignments=update_g,
subexpressions=update_m)
return hydro_lb_update_rule
def initializer_kernel_phase_field_lb(phi_field_distributions, phi_field, velocity, mrt_method, interface_thickness):
r"""
Returns an assignment list for initializing the phase-field distribution functions
Args:
phi_field_distributions: source field of phase-field distribution function
phi_field: phase-field
velocity: velocity field
mrt_method: lattice Boltzmann method of the phase-field lattice Boltzmann step
interface_thickness: interface thickness
"""
stencil = mrt_method.stencil
dimensions = len(stencil[0])
weights = get_weights(stencil, c_s_sq=sp.Rational(1, 3))
u_symp = sp.symbols("u_:{}".format(dimensions))
normal_fd = normalized_isotropic_gradient_symbolic(phi_field, stencil)
gamma = mrt_method.get_equilibrium_terms()
gamma = gamma.subs({sp.symbols("rho"): 1})
gamma_init = gamma.subs({x: y for x, y in zip(u_symp, velocity.center_vector)})
# create the kernels for the initialization of the h field
h_updates = list()
def scalar_product(a, b):
return sum(a_i * b_i for a_i, b_i in zip(a, b))
f = []
for i, d in enumerate(stencil):
f.append(weights[i] * ((1.0 - 4.0 * (phi_field.center - 0.5) ** 2.0) / interface_thickness)
* scalar_product(d, normal_fd[0:dimensions]))
for i, _ in enumerate(stencil):
h_updates.append(Assignment(phi_field_distributions.center(i), phi_field.center * gamma_init[i] - 0.5 * f[i]))
return h_updates
def initializer_kernel_hydro_lb(velocity_distributions, velocity, mrt_method):
r"""
Returns an assignment list for initializing the velocity distribution functions
Args:
velocity_distributions: source field of velocity distribution function
velocity: velocity field
mrt_method: lattice Boltzmann method of the hydrodynamic lattice Boltzmann step
"""
stencil = mrt_method.stencil
dimensions = len(stencil[0])
weights = get_weights(stencil, c_s_sq=sp.Rational(1, 3))
u_symp = sp.symbols("u_:{}".format(dimensions))
gamma = mrt_method.get_equilibrium_terms()
gamma = gamma.subs({sp.symbols("rho"): 1})
gamma_init = gamma.subs({x: y for x, y in zip(u_symp, velocity.center_vector)})
g_updates = list()
for i, _ in enumerate(stencil):
g_updates.append(Assignment(velocity_distributions.center(i), gamma_init[i] - weights[i]))
return g_updates
import math
def calculate_parameters_rti(reference_length=256,
reference_time=16000,
density_heavy=1.0,
capillary_number=0.26,
reynolds_number=3000,
atwood_number=0.5,
peclet_number=1000,
density_ratio=3,
viscosity_ratio=1):
r"""
Calculate the simulation parameters for the Rayleigh-Taylor instability.
The calculation is done according to the description in part B of PhysRevE.96.053301.
Args:
reference_length: reference length of the RTI
reference_time: chosen reference time
density_heavy: density of the heavier fluid
capillary_number: capillary number of the simulation represents the relative effect of viscous drag
forces versus surface tension forces
reynolds_number: reynolds number of the simulation is the ratio between the viscous forces in a fluid
and the inertial forces
atwood_number: atwood number of the simulation is a dimensionless density ratio
peclet_number: peclet number of the simulation is the ratio of the rate of advection
of a physical quantity by the flow to the rate of diffusion of the same quantity
driven by an appropriate gradient
density_ratio: density ratio of the heavier and the lighter fluid
viscosity_ratio: viscosity ratio of the heavier and the lighter fluid
"""
# calculate the gravitational acceleration and the reference velocity
g = reference_length / (reference_time ** 2 * atwood_number)
reference_velocity = math.sqrt(g * reference_length)
dynamic_viscosity_heavy = (density_heavy * reference_velocity * reference_length) / reynolds_number
dynamic_viscosity_light = dynamic_viscosity_heavy / viscosity_ratio
density_light = density_heavy / density_ratio
kinematic_viscosity_heavy = dynamic_viscosity_heavy / density_heavy
kinematic_viscosity_light = dynamic_viscosity_light / density_light
relaxation_time_heavy = 3.0 * kinematic_viscosity_heavy
relaxation_time_light = 3.0 * kinematic_viscosity_light
surface_tension = (dynamic_viscosity_heavy * reference_velocity) / capillary_number
mobility = (reference_velocity * reference_length) / peclet_number
parameters = {
"density_light": density_light,
"dynamic_viscosity_heavy": dynamic_viscosity_heavy,
"dynamic_viscosity_light": dynamic_viscosity_light,
"relaxation_time_heavy": relaxation_time_heavy,
"relaxation_time_light": relaxation_time_light,
"gravitational_acceleration": -g,
"mobility": mobility,
"surface_tension": surface_tension
}
return parameters
def calculate_dimensionless_rising_bubble(reference_time=18000,
density_heavy=1.0,
bubble_radius=16,
bond_number=1,
reynolds_number=40,
density_ratio=1000,
viscosity_ratio=100):
r"""
Calculate the simulation parameters for a rising bubble. The parameter calculation follows the ideas of Weber and
is based on the Reynolds number. This means the rising velocity of the bubble is implicitly stated with the
Reynolds number
Args:
reference_time: chosen reference time
density_heavy: density of the heavier fluid
bubble_radius: initial radius of the rising bubble
bond_number: also called eötvös number is measuring the importance of gravitational forces compared to
surface tension forces
reynolds_number: reynolds number of the simulation is the ratio between the viscous forces in a fluid
and the inertial forces
density_ratio: density ratio of the heavier and the lighter fluid
viscosity_ratio: viscosity ratio of the heavier and the lighter fluid
"""
bubble_diameter = bubble_radius * 2
g = bubble_diameter / (reference_time ** 2)
density_light = density_heavy / density_ratio
dynamic_viscosity_heavy = (density_heavy * math.sqrt(g * bubble_diameter ** 3)) / reynolds_number
dynamic_viscosity_light = dynamic_viscosity_heavy / viscosity_ratio
kinematic_viscosity_heavy = dynamic_viscosity_heavy / density_heavy
kinematic_viscosity_light = dynamic_viscosity_light / density_light
relaxation_time_heavy = 3 * kinematic_viscosity_heavy
relaxation_time_light = 3 * kinematic_viscosity_light
surface_tension = (density_heavy - density_light) * g * bubble_diameter ** 2 / bond_number
# calculation of the Morton number
# Mo = gravitational_acceleration * dynamic_viscosity_heavy / (density_heavy * surface_tension ** 3)
parameters = {
"density_light": density_light,
"dynamic_viscosity_heavy": dynamic_viscosity_heavy,
"dynamic_viscosity_light": dynamic_viscosity_light,
"relaxation_time_heavy": relaxation_time_heavy,
"relaxation_time_light": relaxation_time_light,
"gravitational_acceleration": -g,
"surface_tension": surface_tension
}
return parameters
import sympy as sp
from lbmpy.innerloopsplit import create_lbm_split_groups
from lbmpy.methods.cumulantbased import CumulantBasedLbMethod
from pystencils.simp import (
add_subexpressions_for_divisions, apply_to_all_assignments,
subexpression_substitution_in_main_assignments, sympy_cse)
def create_simplification_strategy(lb_method, cse_pdfs=False, cse_global=False, split_inner_loop=False):
from pystencils.simp import SimplificationStrategy
from lbmpy.methods.momentbased import MomentBasedLbMethod
from lbmpy.methods.momentbasedsimplifications import replace_second_order_velocity_products, \
factor_density_after_factoring_relaxation_times, factor_relaxation_rates, cse_in_opposing_directions, \
replace_common_quadratic_and_constant_term, replace_density_and_velocity
s = SimplificationStrategy()
expand = apply_to_all_assignments(sp.expand)
if isinstance(lb_method, MomentBasedLbMethod):
s.add(expand)
s.add(replace_second_order_velocity_products)
s.add(expand)
s.add(factor_relaxation_rates)
s.add(replace_density_and_velocity)
s.add(replace_common_quadratic_and_constant_term)
s.add(factor_density_after_factoring_relaxation_times)
s.add(subexpression_substitution_in_main_assignments)
if split_inner_loop:
s.add(create_lbm_split_groups)
elif isinstance(lb_method, CumulantBasedLbMethod):
s.add(expand)
s.add(factor_relaxation_rates)
s.add(add_subexpressions_for_divisions)
if cse_pdfs:
s.add(cse_in_opposing_directions)
if cse_global:
s.add(sympy_cse)
return s
def get_stencil(name, ordering='walberla'):
"""
Stencils are tuples of discrete velocities. They are commonly labeled in the 'DxQy' notation, where d is the
dimension (length of the velocity tuples) and y is number of discrete velocities.
Args:
name: DxQy notation
ordering: the LBM literature does not use a common order of the discrete velocities, therefore here
different common orderings are available. All orderings lead to the same method, it just has
to be used consistently. Here more orderings are available to compare intermediate results with
the literature.
"""
try:
return get_stencil.data[name.upper()][ordering.lower()]
except KeyError:
err_msg = ""
for stencil, ordering_names in get_stencil.data.items():
err_msg += " %s: %s\n" % (stencil, ", ".join(ordering_names.keys()))
raise ValueError("No such stencil available. "
"Available stencils: <stencil_name>( <ordering_names> )\n" + err_msg)
get_stencil.data = {
'D2Q9': {
'walberla': ((0, 0),
(0, 1), (0, -1), (-1, 0), (1, 0),
(-1, 1), (1, 1), (-1, -1), (1, -1),),
'counterclockwise': ((0, 0),
(1, 0), (0, 1), (-1, 0), (0, -1),
(1, 1), (-1, 1), (-1, -1), (1, -1)),
'braunschweig': ((0, 0),
(-1, 1), (-1, 0), (-1, -1), (0, -1),
(1, -1), (1, 0), (1, 1), (0, 1)),
'uk': ((0, 0),
(1, 0), (-1, 0), (0, 1), (0, -1),
(1, 1), (-1, -1), (-1, 1), (1, -1),
)
},
'D2V17': {
'walberla': (
(0, 0), (0, -1), (-1, 0), (1, 0), (0, 1), (-1, -1), (1, -1), (-1, 1), (1, 1), (-2, -2), (2, -2), (-2, 2),
(2, 2), (0, -3), (-3, 0), (3, 0), (0, 3)),
},
'D2V37': {
'walberla': (
(0, 0), (0, -1), (-1, 0), (1, 0), (0, 1), (-1, -1), (1, -1), (-1, 1), (1, 1), (0, -2), (-2, 0), (2, 0),
(0, 2), (-1, -2), (1, -2), (-2, -1), (2, -1), (-2, 1), (2, 1), (-1, 2), (1, 2), (-2, -2), (2, -2), (-2, 2),
(2, 2), (0, -3), (-3, 0), (3, 0), (0, 3), (-1, -3), (1, -3), (-3, -1), (3, -1), (-3, 1), (3, 1), (-1, 3),
(1, 3))
},
'D3Q15': {
'walberla':
((0, 0, 0),
(0, 1, 0), (0, -1, 0), (-1, 0, 0), (1, 0, 0), (0, 0, 1), (0, 0, -1),
(1, 1, 1), (-1, 1, 1), (1, -1, 1), (-1, -1, 1), (1, 1, -1), (-1, 1, -1), (1, -1, -1), (-1, -1, -1)),
'premnath': ((0, 0, 0),
(1, 0, 0), (-1, 0, 0), (0, 1, 0), (0, -1, 0), (0, 0, 1), (0, 0, -1),
(1, 1, 1), (-1, 1, 1), (1, -1, 1), (-1, -1, 1),
(1, 1, -1), (-1, 1, -1), (1, -1, -1), (-1, -1, -1)),
},
'D3Q19': {
'walberla': ((0, 0, 0),
(0, 1, 0), (0, -1, 0), (-1, 0, 0), (1, 0, 0), (0, 0, 1), (0, 0, -1),
(-1, 1, 0), (1, 1, 0), (-1, -1, 0), (1, -1, 0),
(0, 1, 1), (0, -1, 1), (-1, 0, 1), (1, 0, 1),
(0, 1, -1), (0, -1, -1), (-1, 0, -1), (1, 0, -1)),
'braunschweig': ((0, 0, 0),
(1, 0, 0), (-1, 0, 0),
(0, 1, 0), (0, -1, 0),
(0, 0, 1), (0, 0, -1),
(1, 1, 0), (-1, -1, 0),
(1, -1, 0), (-1, 1, 0),
(1, 0, 1), (-1, 0, -1),
(1, 0, -1), (-1, 0, 1),
(0, 1, 1), (0, -1, -1),
(0, 1, -1), (0, -1, 1)),
'premnath': ((0, 0, 0),
(1, 0, 0), (-1, 0, 0), (0, 1, 0), (0, -1, 0), (0, 0, 1), (0, 0, -1),
(1, 1, 0), (-1, 1, 0), (1, -1, 0), (-1, -1, 0),
(1, 0, 1), (-1, 0, 1), (1, 0, -1), (-1, 0, -1),
(0, 1, 1), (0, -1, 1), (0, 1, -1), (0, -1, -1)),
},
'D3Q27': {
'walberla': ((0, 0, 0),
(0, 1, 0), (0, -1, 0), (-1, 0, 0), (1, 0, 0), (0, 0, 1), (0, 0, -1),
(-1, 1, 0), (1, 1, 0), (-1, -1, 0), (1, -1, 0),
(0, 1, 1), (0, -1, 1), (-1, 0, 1), (1, 0, 1),
(0, 1, -1), (0, -1, -1), (-1, 0, -1), (1, 0, -1),
(1, 1, 1), (-1, 1, 1), (1, -1, 1), (-1, -1, 1), (1, 1, -1), (-1, 1, -1), (1, -1, -1),
(-1, -1, -1)),
'premnath': ((0, 0, 0),
(1, 0, 0), (-1, 0, 0), (0, 1, 0), (0, -1, 0), (0, 0, 1), (0, 0, -1),
(1, 1, 0), (-1, 1, 0), (1, -1, 0), (-1, -1, 0),
(1, 0, 1), (-1, 0, 1), (1, 0, -1), (-1, 0, -1),
(0, 1, 1), (0, -1, 1), (0, 1, -1), (0, -1, -1),
(1, 1, 1), (-1, 1, 1), (1, -1, 1), (-1, -1, 1),
(1, 1, -1), (-1, 1, -1), (1, -1, -1), (-1, -1, -1)),
'fakhari': ((0, 0, 0),
(1, 0, 0), (-1, 0, 0), (0, 1, 0), (0, -1, 0), (0, 0, 1), (0, 0, -1),
(1, 1, 1), (-1, 1, 1), (1, -1, 1), (-1, -1, 1),
(1, 1, -1), (-1, 1, -1), (1, -1, -1), (-1, -1, -1),
(1, 1, 0), (-1, 1, 0), (1, -1, 0), (-1, -1, 0),
(1, 0, 1), (-1, 0, 1), (1, 0, -1), (-1, 0, -1), (0, 1, 1), (0, -1, 1), (0, 1, -1), (0, -1, -1)),
}
}
import sympy as sp
from lbmpy.relaxationrates import get_shear_relaxation_rate
from pystencils import Assignment
def second_order_moment_tensor(function_values, stencil):
"""Returns (D x D) Matrix of second order moments of the given function where D is the dimension"""
assert len(function_values) == len(stencil)
dim = len(stencil[0])
return sp.Matrix(dim, dim, lambda i, j: sum(c[i] * c[j] * f for f, c in zip(function_values, stencil)))
def frobenius_norm(matrix, factor=1):
"""Computes the Frobenius norm of a matrix defined as the square root of the sum of squared matrix elements
The optional factor is added inside the square root"""
return sp.sqrt(sum(i * i for i in matrix) * factor)
def add_smagorinsky_model(collision_rule, smagorinsky_constant, omega_output_field=None):
method = collision_rule.method
omega_s = get_shear_relaxation_rate(method)
f_neq = sp.Matrix(method.pre_collision_pdf_symbols) - method.get_equilibrium_terms()
tau_0 = sp.Symbol("tau_0_")
second_order_neq_moments = sp.Symbol("Pi")
rho = method.zeroth_order_equilibrium_moment_symbol if method.conserved_quantity_computation.compressible else 1
adapted_omega = sp.Symbol("smagorinsky_omega")
collision_rule = collision_rule.new_with_substitutions({omega_s: adapted_omega})
# for derivation see notebook demo_custom_LES_model.ipynb
eqs = [Assignment(tau_0, 1 / omega_s),
Assignment(second_order_neq_moments,
frobenius_norm(second_order_moment_tensor(f_neq, method.stencil), factor=2) / rho),
Assignment(adapted_omega,
2 / (tau_0 + sp.sqrt(18 * smagorinsky_constant ** 2 * second_order_neq_moments + tau_0 ** 2)))]
collision_rule.subexpressions += eqs
collision_rule.topological_sort(sort_subexpressions=True, sort_main_assignments=False)
if omega_output_field:
collision_rule.main_assignments.append(Assignment(omega_output_field.center, adapted_omega))
return collision_rule