Skip to content
Snippets Groups Projects
Commit 073d8d3c authored by Martin Bauer's avatar Martin Bauer
Browse files

Advection - diffusion abstraction layer

parent 106aa0a7
Branches
Tags
No related merge requests found
...@@ -132,3 +132,172 @@ def __upDownOffsets(d, dim): ...@@ -132,3 +132,172 @@ def __upDownOffsets(d, dim):
coord[d] = -1 coord[d] = -1
down = np.array(coord, dtype=np.int) down = np.array(coord, dtype=np.int)
return up, down return up, down
# --------------------------------------- Advection Diffusion ----------------------------------------------------------
class Advection(sp.Function):
"""Advection term, create with advection(scalarField, vectorField)"""
@property
def scalar(self):
return self.args[0].field
@property
def vector(self):
if isinstance(self.args[1], Field.Access):
return self.args[1].field
else:
return self.args[1]
@property
def scalarIndex(self):
return None if len(self.args) <= 2 else int(self.args[2])
@property
def dim(self):
return self.scalar.spatialDimensions
def _latex(self, printer):
nameSuffix = "_%s" % self.scalarIndex if self.scalarIndex is not None else ""
if isinstance(self.vector, Field):
return r"\nabla \cdot(%s %s)" % (printer.doprint(sp.Symbol(self.vector.name)),
printer.doprint(sp.Symbol(self.scalar.name+nameSuffix)))
else:
args = [r"\partial_%d(%s %s)" % (i, printer.doprint(sp.Symbol(self.scalar.name+nameSuffix)),
printer.doprint(self.vector[i]))
for i in range(self.dim)]
return " + ".join(args)
def advection(scalar, vector, idx=None):
assert isinstance(scalar, Field), "Advected scalar has to be a pystencils.Field"
args = [scalar.center(), vector if not isinstance(vector, Field) else vector.center()]
if idx is not None:
args.append(idx)
return Advection(*args)
class Diffusion(sp.Function):
@property
def scalar(self):
return self.args[0].field
@property
def diffusionCoeff(self):
if isinstance(self.args[1], Field.Access):
return self.args[1].field
else:
return self.args[1]
@property
def scalarIndex(self):
return None if len(self.args) <= 2 else int(self.args[2])
@property
def dim(self):
return self.scalar.spatialDimensions
def _latex(self, printer):
nameSuffix = "_%s" % self.scalarIndex if self.scalarIndex is not None else ""
diffCoeff = sp.Symbol(self.diffusionCoeff.name) if isinstance(self.diffusionCoeff, Field) else self.diffusionCoeff
return r"div(%s \nabla %s)" % (printer.doprint(diffCoeff),
printer.doprint(sp.Symbol(self.scalar.name+nameSuffix)))
def diffusion(scalar, diffusionCoeff, idx=None):
assert isinstance(scalar, Field), "Advected scalar has to be a pystencils.Field"
args = [scalar.center(), diffusionCoeff if not isinstance(diffusionCoeff, Field) else diffusionCoeff.center()]
if idx is not None:
args.append(idx)
return Diffusion(*args)
class Transient(sp.Function):
@property
def scalar(self):
return self.args[0].field
@property
def scalarIndex(self):
return None if len(self.args) <= 1 else int(self.args[1])
def _latex(self, printer):
nameSuffix = "_%s" % self.scalarIndex if self.scalarIndex is not None else ""
return r"\partial_t %s" % (printer.doprint(sp.Symbol(self.scalar.name+nameSuffix)),)
def transient(scalar, idx=None):
args = [scalar.center()]
if idx is not None:
args.append(idx)
return Transient(*args)
class Discretization2ndOrder:
def __init__(self, dx=sp.Symbol("dx"), dt=sp.Symbol("dt")):
self.dx = dx
self.dt = dt
def _discretize_diffusion(self, expr):
result = 0
scalar = expr.scalar
for c in range(expr.dim):
if isinstance(expr.diffusionCoeff, Field):
firstDiffs = [offset *
(scalar.neighbor(c, offset) * expr.diffusionCoeff.neighbor(c, offset) -
scalar.center() * expr.diffusionCoeff.center())
for offset in [-1, 1]]
else:
firstDiffs = [offset *
(scalar.neighbor(c, offset) * expr.diffusionCoeff -
scalar.center() * expr.diffusionCoeff)
for offset in [-1, 1]]
result += firstDiffs[1] - firstDiffs[0]
return result / (self.dx**2)
def _discretize_advection(self, expr):
idx = 0 if expr.scalarIndex is None else int(expr.scalarIndex)
result = 0
for c in range(expr.dim):
if isinstance(expr.vector, Field):
assert expr.vector.indexDimensions == 1
interpolated = [(expr.scalar.neighbor(c, offset)(idx) * expr.vector.neighbor(c, offset)(c) +
expr.scalar.neighbor(c, 0)(idx) * expr.vector.neighbor(c, 0)(c)) / 2
for offset in [-1, 1]]
else:
interpolated = [(expr.scalar.neighbor(c, offset)(idx) * expr.vector(c) -
expr.scalar.neighbor(c, 0)(idx) * expr.vector(c)) / 2
for offset in [-1, 1]]
result += interpolated[1] - interpolated[0]
return result / self.dx
def _discretizeSpatial(self, e):
if isinstance(e, Diffusion):
return self._discretize_diffusion(e)
elif isinstance(e, Advection):
return self._discretize_advection(e)
else:
newArgs = [self._discretizeSpatial(a) for a in e.args]
return e.func(*newArgs) if newArgs else e
def __call__(self, expr):
transientTerms = expr.atoms(Transient)
if len(transientTerms) == 0:
return self._discretizeSpatial(expr)
elif len(transientTerms) == 1:
transientTerm = transientTerms.pop()
solveResult = sp.solve(expr, transientTerm)
if len(solveResult) != 1:
raise ValueError("Could not solve for transient term" + str(solveResult))
rhs = solveResult.pop()
idx = 0 if transientTerm.scalarIndex is None else transientTerm.scalarIndex
# explicit euler
return transientTerm.scalar(idx) + self.dt * self._discretizeSpatial(rhs)
else:
raise NotImplementedError("Cannot discretize expression with more than one transient term")
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment