Commit 036fe134 authored by Helen Schottenhamml's avatar Helen Schottenhamml
Browse files

Merge branch 'FreeSlip' into 'master'

FreeSlip

Closes #3

See merge request pycodegen/lbmpy!89
parents 69c89003 b1250a85
Pipeline #33183 passed with stages
in 32 minutes and 41 seconds
......@@ -231,3 +231,31 @@ class NeighbourOffsetArrays(CustomCodeNode):
offset_symbols = NeighbourOffsetArrays._offset_symbols(dim)
super(NeighbourOffsetArrays, self).__init__(code, symbols_read=set(),
symbols_defined=set(offset_symbols))
class MirroredStencilDirections(CustomCodeNode):
@staticmethod
def mirror_stencil(direction, mirror_axis):
assert mirror_axis <= len(direction), f"only {len(direction)} axis available for mirage"
direction = list(direction)
direction[mirror_axis] = -direction[mirror_axis]
return tuple(direction)
@staticmethod
def _mirrored_symbol(mirror_axis):
axis = ['x', 'y', 'z']
return TypedSymbol(f"{axis[mirror_axis]}_axis_mirrored_stencil_dir", create_type(np.int64))
def __init__(self, stencil, mirror_axis, dtype=np.int64):
offsets_dtype = create_type(dtype)
mirrored_stencil_symbol = MirroredStencilDirections._mirrored_symbol(mirror_axis)
mirrored_directions = [stencil.index(MirroredStencilDirections.mirror_stencil(direction, mirror_axis))
for direction in stencil]
code = "\n"
code += _array_pattern(offsets_dtype, mirrored_stencil_symbol.name, mirrored_directions)
super(MirroredStencilDirections, self).__init__(code, symbols_read=set(),
symbols_defined={mirrored_stencil_symbol})
from lbmpy.boundaries.boundaryconditions import (
UBB, FixedDensity, DiffusionDirichlet, SimpleExtrapolationOutflow,
ExtrapolationOutflow, NeumannByCopy, NoSlip, StreamInConstant)
ExtrapolationOutflow, NeumannByCopy, NoSlip, StreamInConstant, FreeSlip)
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
__all__ = ['NoSlip', 'UBB', 'SimpleExtrapolationOutflow', 'ExtrapolationOutflow',
__all__ = ['NoSlip', 'FreeSlip', 'UBB', 'SimpleExtrapolationOutflow', 'ExtrapolationOutflow',
'FixedDensity', 'DiffusionDirichlet', 'NeumannByCopy',
'LatticeBoltzmannBoundaryHandling', 'StreamInConstant']
from warnings import warn
from lbmpy.advanced_streaming.utility import AccessPdfValues, Timestep
from pystencils.simp.assignment_collection import AssignmentCollection
from pystencils import Assignment, Field
......@@ -5,7 +7,7 @@ from lbmpy.boundaries.boundaryhandling import LbmWeightInfo
from pystencils.data_types import create_type
from pystencils.sympyextensions import get_symmetric_part
from lbmpy.simplificationfactory import create_simplification_strategy
from lbmpy.advanced_streaming.indexing import NeighbourOffsetArrays
from lbmpy.advanced_streaming.indexing import NeighbourOffsetArrays, MirroredStencilDirections
from pystencils.stencil import offset_to_direction_string, direction_string_to_offset, inverse_direction
import sympy as sp
......@@ -107,8 +109,126 @@ class NoSlip(LbBoundary):
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
return Assignment(f_in(inv_dir[dir_symbol]), f_out(dir_symbol))
# end class NoSlip
class FreeSlip(LbBoundary):
"""
Free-Slip boundary condition, which enforces a zero normal fluid velocity $u_n = 0$ but places no restrictions
on the tangential fluid velocity $u_t$.
Args:
stencil: LBM stencil which is used for the simulation
normal_direction: optional normal direction. If the Free slip boundary is applied to a certain side in the
domain it is not necessary to calculate the normal direction since it can be stated for all
boundary cells. This reduces the memory space for the index array significantly.
name: optional name of the boundary.
"""
def __init__(self, stencil, normal_direction=None, name=None):
"""Set an optional name here, to mark boundaries, for example for force evaluations"""
self.stencil = stencil
if normal_direction and len(normal_direction) - normal_direction.count(0) != 1:
raise ValueError("It is only possible to pre specify the normal direction for simple situations."
"This means if the free slip boundary is applied to a straight wall or side in the "
"simulation domain. A possible value for example would be (0, 1, 0) if the "
"free slip boundary is applied to the northern wall. For more complex situations "
"the normal direction has to be calculated for each cell. This is done when "
"the normal direction is not defined for this class")
if normal_direction:
self.mirror_axis = normal_direction.index(*[dir for dir in normal_direction if dir != 0])
self.normal_direction = normal_direction
self.dim = len(stencil[0])
if name is None and normal_direction:
name = f"Free Slip : {offset_to_direction_string([-x for x in normal_direction])}"
super(FreeSlip, self).__init__(name)
def init_callback(self, boundary_data, **_):
if len(boundary_data.index_array) > 1e6:
warn(f"The calculation of the normal direction for each cell might take a long time, because "
f"{len(boundary_data.index_array)} cells are marked as Free Slip boundary cells. Consider specifying "
f" the normal direction beforehand, which is possible if it is equal for all cells (e.g. at a wall)")
dim = boundary_data.dim
coords = [coord for coord, _ in zip(['x', 'y', 'z'], range(dim))]
boundary_cells = set()
# get a set containing all boundary cells
for cell in boundary_data.index_array:
fluid_cell = tuple([cell[coord] for coord in coords])
direction = self.stencil[cell['dir']]
boundary_cell = tuple([i + d for i, d in zip(fluid_cell, direction)])
boundary_cells.add(boundary_cell)
for cell in boundary_data.index_array:
fluid_cell = tuple([cell[coord] for coord in coords])
direction = self.stencil[cell['dir']]
ref_direction = direction
normal_direction = [0] * dim
for i in range(dim):
sub_direction = [0] * dim
sub_direction[i] = direction[i]
test_cell = tuple([x + y for x, y in zip(fluid_cell, sub_direction)])
if test_cell in boundary_cells:
normal_direction[i] = direction[i]
ref_direction = MirroredStencilDirections.mirror_stencil(ref_direction, i)
ref_direction = inverse_direction(ref_direction)
for i, cell_name in zip(range(dim), self.additional_data):
cell[cell_name[0]] = -normal_direction[i]
cell['ref_dir'] = self.stencil.index(ref_direction)
@property
def additional_data(self):
"""Used internally only. For the FreeSlip boundary the information of the normal direction for each pdf
direction is needed. This information is stored in the index vector."""
if self.normal_direction:
return []
else:
data_type = create_type('int32')
wnz = [] if self.dim == 2 else [('wnz', data_type)]
data = [('wnx', data_type), ('wny', data_type)] + wnz
return data + [('ref_dir', data_type)]
@property
def additional_data_init_callback(self):
if self.normal_direction:
return None
else:
return self.init_callback
def get_additional_code_nodes(self, lb_method):
if self.normal_direction:
return [MirroredStencilDirections(self.stencil, self.mirror_axis)]
else:
return []
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
if self.normal_direction:
normal_direction = self.normal_direction
mirrored_stencil_symbol = MirroredStencilDirections._mirrored_symbol(self.mirror_axis)
mirrored_direction = inv_dir[sp.IndexedBase(mirrored_stencil_symbol, shape=(1,))[dir_symbol]]
else:
normal_direction = list()
for i, cell_name in zip(range(self.dim), self.additional_data):
normal_direction.append(index_field[0](cell_name[0]))
normal_direction = tuple(normal_direction)
mirrored_direction = index_field[0]('ref_dir')
return Assignment(f_in(inv_dir[dir_symbol]), f_in[normal_direction](mirrored_direction))
# end class FreeSlip
class UBB(LbBoundary):
"""Velocity bounce back boundary condition, enforcing specified velocity at obstacle
......@@ -173,12 +293,11 @@ class UBB(LbBoundary):
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
vel_from_idx_field = callable(self._velocity)
vel = [index_field(f'vel_{i}') for i in range(self.dim)] if vel_from_idx_field else self._velocity
direction = dir_symbol
assert self.dim == lb_method.dim, \
f"Dimension of UBB ({self.dim}) does not match dimension of method ({lb_method.dim})"
neighbor_offset = NeighbourOffsetArrays.neighbour_offset(direction, lb_method.stencil)
neighbor_offset = NeighbourOffsetArrays.neighbour_offset(dir_symbol, lb_method.stencil)
velocity = tuple(v_i.get_shifted(*neighbor_offset)
if isinstance(v_i, Field.Access) and not vel_from_idx_field
......@@ -193,7 +312,7 @@ class UBB(LbBoundary):
c_s_sq = sp.Rational(1, 3)
weight_of_direction = LbmWeightInfo.weight_of_direction
vel_term = 2 / c_s_sq * sum([d_i * v_i for d_i, v_i in zip(neighbor_offset, velocity)]) * weight_of_direction(
direction, lb_method)
dir_symbol, lb_method)
# Better alternative: in conserved value computation
# rename what is currently called density to "virtual_density"
......@@ -205,12 +324,13 @@ class UBB(LbBoundary):
density_equations = cqc.output_equations_from_pdfs(pdf_field_accesses, {'density': density_symbol})
density_symbol = lb_method.conserved_quantity_computation.defined_symbols()['density']
result = density_equations.all_assignments
result += [Assignment(f_in(inv_dir[direction]),
f_out(direction) - vel_term * density_symbol)]
result += [Assignment(f_in(inv_dir[dir_symbol]),
f_out(dir_symbol) - vel_term * density_symbol)]
return result
else:
return [Assignment(f_in(inv_dir[direction]),
f_out(direction) - vel_term)]
return [Assignment(f_in(inv_dir[dir_symbol]),
f_out(dir_symbol) - vel_term)]
# end class UBB
......@@ -259,6 +379,7 @@ class SimpleExtrapolationOutflow(LbBoundary):
return Assignment(f_in.center(inv_dir[dir_symbol]), f_out[tangential_offset](inv_dir[dir_symbol]))
# end class SimpleExtrapolationOutflow
......@@ -406,6 +527,7 @@ class ExtrapolationOutflow(LbBoundary):
return AssignmentCollection(boundary_assignments, subexpressions=subexpressions)
# end class ExtrapolationOutflow
......@@ -459,6 +581,7 @@ class FixedDensity(LbBoundary):
return subexpressions + [Assignment(f_in(inv_dir[dir_symbol]),
2 * eq_component - f_out(dir_symbol))]
# end class FixedDensity
......@@ -493,6 +616,7 @@ class DiffusionDirichlet(LbBoundary):
return [Assignment(f_in(inv_dir[dir_symbol]),
2 * w_dir * self.concentration - f_out(dir_symbol))]
# end class DiffusionDirichlet
......@@ -516,6 +640,7 @@ class NeumannByCopy(LbBoundary):
return [Assignment(f_in(inv_dir[dir_symbol]), f_out(inv_dir[dir_symbol])),
Assignment(f_out[neighbour_offset](dir_symbol), f_out(dir_symbol))]
# end class NeumannByCopy
......
......@@ -2,13 +2,23 @@ import numpy as np
import pytest
from lbmpy.boundaries import NoSlip, UBB, SimpleExtrapolationOutflow, ExtrapolationOutflow,\
FixedDensity, DiffusionDirichlet, NeumannByCopy, StreamInConstant
FixedDensity, DiffusionDirichlet, NeumannByCopy, StreamInConstant, FreeSlip
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.creationfunctions import create_lb_function, create_lb_method
from lbmpy.geometry import add_box_boundary
from lbmpy.lbstep import LatticeBoltzmannStep
from lbmpy.stencils import get_stencil
from pystencils import create_data_handling, make_slice
from pystencils.slicing import slice_from_direction
from pystencils.stencil import inverse_direction
def mirror_stencil(direction, mirror_axis):
for i, n in enumerate(mirror_axis):
if n != 0:
direction[i] = -direction[i]
return tuple(direction)
@pytest.mark.parametrize("target", ['cpu', 'gpu', 'opencl'])
......@@ -105,6 +115,110 @@ def test_simple(target):
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 8] == 5))
def test_free_slip_index_list():
stencil = get_stencil('D2Q9')
dh = create_data_handling(domain_size=(4, 4), periodicity=(False, False))
src = dh.add_array('src', values_per_cell=len(stencil), alignment=True)
dh.fill('src', 0.0, ghost_layers=True)
method = create_lb_method(stencil='D2Q9', method='srt', relaxation_rate=1.8)
bh = LatticeBoltzmannBoundaryHandling(method, dh, 'src', name="bh")
free_slip = FreeSlip(stencil=stencil)
add_box_boundary(bh, free_slip)
bh.prepare()
for b in dh.iterate():
for b_obj, idx_arr in b[bh._index_array_name].boundary_object_to_index_list.items():
index_array = idx_arr
# normal directions
normal_west = (1, 0)
normal_east = (-1, 0)
normal_south = (0, 1)
normal_north = (0, -1)
normal_south_west = (1, 1)
normal_north_west = (1, -1)
normal_south_east = (-1, 1)
normal_north_east = (-1, -1)
for cell in index_array:
direction = stencil[cell[2]]
inv_dir = inverse_direction(direction)
boundary_cell = (cell[0] + direction[0], cell[1] + direction[1])
normal = (cell[3], cell[4])
# the data is written on the inverse direction of the fluid cell near the boundary
# the data is read from the mirrored direction of the inverse direction where the mirror axis is the normal
assert cell[5] == stencil.index(mirror_stencil(list(inv_dir), normal))
if boundary_cell[0] == 0 and 0 < boundary_cell[1] < 5:
assert normal == normal_west
if boundary_cell[0] == 5 and 0 < boundary_cell[1] < 5:
assert normal == normal_east
if 0 < boundary_cell[0] < 5 and boundary_cell[1] == 0:
assert normal == normal_south
if 0 < boundary_cell[0] < 5 and boundary_cell[1] == 5:
assert normal == normal_north
if boundary_cell == (0, 0):
assert cell[2] == cell[5]
assert normal == normal_south_west
if boundary_cell == (5, 0):
assert cell[2] == cell[5]
assert normal == normal_south_east
if boundary_cell == (0, 5):
assert cell[2] == cell[5]
assert normal == normal_north_west
if boundary_cell == (5, 5):
assert cell[2] == cell[5]
assert normal == normal_north_east
def test_free_slip_equivalence():
# check if Free slip BC does the same if the normal direction is specified or not
stencil = get_stencil('D2Q9')
dh = create_data_handling(domain_size=(4, 4), periodicity=(False, False))
src1 = dh.add_array('src1', values_per_cell=len(stencil), alignment=True)
src2 = dh.add_array('src2', values_per_cell=len(stencil), alignment=True)
dh.fill('src1', 0.0, ghost_layers=True)
dh.fill('src2', 0.0, ghost_layers=True)
shape = dh.gather_array('src1', ghost_layers=True).shape
num = 0
for x in range(shape[0]):
for y in range(shape[1]):
for direction in range(shape[2]):
dh.cpu_arrays['src1'][x, y, direction] = num
dh.cpu_arrays['src2'][x, y, direction] = num
num += 1
method = create_lb_method(stencil='D2Q9', method='srt', relaxation_rate=1.8)
bh1 = LatticeBoltzmannBoundaryHandling(method, dh, 'src1', name="bh1")
free_slip1 = FreeSlip(stencil=stencil)
bh1.set_boundary(free_slip1, slice_from_direction('N', dh.dim))
bh2 = LatticeBoltzmannBoundaryHandling(method, dh, 'src2', name="bh2")
free_slip2 = FreeSlip(stencil=stencil, normal_direction=(0, -1))
bh2.set_boundary(free_slip2, slice_from_direction('N', dh.dim))
bh1()
bh2()
assert np.array_equal(dh.cpu_arrays['src1'], dh.cpu_arrays['src2'])
def test_exotic_boundaries():
step = LatticeBoltzmannStep((50, 50), relaxation_rate=1.8, compressible=False, periodicity=False)
add_box_boundary(step.boundary_handling, NeumannByCopy())
......
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment