Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
pycodegen
pystencils
Commits
1af235da
Commit
1af235da
authored
Jan 14, 2020
by
Michael Kuron
Browse files
Automatic Finite Volume and Volume-of-Fluid discretization
parent
171d844a
Pipeline
#21012
passed with stage
in 4 minutes and 28 seconds
Changes
3
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
pystencils/fd/__init__.py
View file @
1af235da
...
@@ -3,10 +3,11 @@ from .derivative import (
...
@@ -3,10 +3,11 @@ from .derivative import (
expand_diff_full
,
expand_diff_linear
,
expand_diff_products
,
functional_derivative
,
expand_diff_full
,
expand_diff_linear
,
expand_diff_products
,
functional_derivative
,
normalize_diff_order
,
zero_diffs
)
normalize_diff_order
,
zero_diffs
)
from
.finitedifferences
import
Discretization2ndOrder
,
advection
,
diffusion
,
transient
from
.finitedifferences
import
Discretization2ndOrder
,
advection
,
diffusion
,
transient
from
.finitevolumes
import
FVM1stOrder
,
VOF
from
.spatial
import
discretize_spatial
,
discretize_spatial_staggered
from
.spatial
import
discretize_spatial
,
discretize_spatial_staggered
__all__
=
[
'Diff'
,
'diff'
,
'DiffOperator'
,
'diff_terms'
,
'collect_diffs'
,
__all__
=
[
'Diff'
,
'diff'
,
'DiffOperator'
,
'diff_terms'
,
'collect_diffs'
,
'zero_diffs'
,
'evaluate_diffs'
,
'normalize_diff_order'
,
'expand_diff_full'
,
'expand_diff_linear'
,
'zero_diffs'
,
'evaluate_diffs'
,
'normalize_diff_order'
,
'expand_diff_full'
,
'expand_diff_linear'
,
'expand_diff_products'
,
'combine_diff_products'
,
'functional_derivative'
,
'expand_diff_products'
,
'combine_diff_products'
,
'functional_derivative'
,
'advection'
,
'diffusion'
,
'transient'
,
'Discretization2ndOrder'
,
'discretize_spatial'
,
'advection'
,
'diffusion'
,
'transient'
,
'Discretization2ndOrder'
,
'discretize_spatial'
,
'discretize_spatial_staggered'
]
'discretize_spatial_staggered'
,
'FVM1stOrder'
,
'VOF'
]
pystencils/fd/finitevolumes.py
0 → 100644
View file @
1af235da
import
pystencils
as
ps
import
sympy
as
sp
from
pystencils.fd.derivation
import
FiniteDifferenceStaggeredStencilDerivation
as
FDS
,
\
FiniteDifferenceStencilDerivation
as
FD
import
itertools
from
collections
import
defaultdict
from
collections.abc
import
Iterable
class
FVM1stOrder
:
"""Finite-volume discretization
Args:
field: the field with the quantity to calculate, e.g. a concentration
flux: a list of sympy expressions that specify the flux, one for each cartesian direction
source: a list of sympy expressions that specify the source
"""
def
__init__
(
self
,
field
:
ps
.
field
.
Field
,
flux
=
0
,
source
=
0
):
def
normalize
(
f
,
shape
):
shape
=
tuple
(
s
for
s
in
shape
if
s
!=
1
)
if
not
shape
:
shape
=
None
if
isinstance
(
f
,
sp
.
Array
)
or
isinstance
(
f
,
Iterable
)
or
isinstance
(
f
,
sp
.
Matrix
):
return
sp
.
Array
(
f
,
shape
)
else
:
return
sp
.
Array
([
f
]
*
(
sp
.
Mul
(
*
shape
)
if
shape
else
1
))
self
.
c
=
field
self
.
dim
=
self
.
c
.
spatial_dimensions
self
.
j
=
normalize
(
flux
,
(
self
.
dim
,
)
+
self
.
c
.
index_shape
)
self
.
q
=
normalize
(
source
,
self
.
c
.
index_shape
)
def
discrete_flux
(
self
,
flux_field
:
ps
.
field
.
Field
):
"""Return a list of assignments for the discrete fluxes
Args:
flux_field: a staggered field to which the fluxes should be assigned
"""
assert
ps
.
FieldType
.
is_staggered
(
flux_field
)
def
discretize
(
term
,
neighbor
):
if
isinstance
(
term
,
sp
.
Matrix
):
nw
=
term
.
applyfunc
(
lambda
t
:
discretize
(
t
,
neighbor
))
return
nw
elif
isinstance
(
term
,
ps
.
field
.
Field
.
Access
):
avg
=
(
term
.
get_shifted
(
*
neighbor
)
+
term
)
*
sp
.
Rational
(
1
,
2
)
return
avg
elif
isinstance
(
term
,
ps
.
fd
.
Diff
):
direction1
=
term
.
args
[
1
]
if
isinstance
(
term
.
args
[
0
],
ps
.
Field
.
Access
):
# first derivative
access
=
term
.
args
[
0
]
direction
=
(
direction1
,)
elif
isinstance
(
term
.
args
[
0
],
ps
.
fd
.
Diff
):
# nested derivative
if
isinstance
(
term
.
args
[
0
].
args
[
0
],
ps
.
fd
.
Diff
):
# third or higher derivative
raise
ValueError
(
"can only handle first and second derivatives"
)
elif
not
isinstance
(
term
.
args
[
0
].
args
[
0
],
ps
.
Field
.
Access
):
raise
ValueError
(
"can only handle derivatives of field accesses"
)
access
,
direction2
=
term
.
args
[
0
].
args
[:
2
]
direction
=
(
direction1
,
direction2
)
else
:
raise
NotImplementedError
(
"can only deal with derivatives of field accesses, "
"but not {}; expansion of derivatives probably failed"
.
format
(
type
(
term
.
args
[
0
])))
fds
=
FDS
(
neighbor
,
access
.
field
.
spatial_dimensions
,
direction
)
return
fds
.
apply
(
access
)
if
term
.
args
:
new_args
=
[
discretize
(
a
,
neighbor
)
for
a
in
term
.
args
]
return
term
.
func
(
*
new_args
)
else
:
return
term
fluxes
=
self
.
j
.
applyfunc
(
ps
.
fd
.
derivative
.
expand_diff_full
)
fluxes
=
[
sp
.
Matrix
(
fluxes
.
tolist
()[
i
])
if
flux_field
.
index_dimensions
>
1
else
fluxes
.
tolist
()[
i
]
for
i
in
range
(
self
.
dim
)]
discrete_fluxes
=
[]
for
neighbor
in
flux_field
.
staggered_stencil
:
neighbor
=
ps
.
stencil
.
direction_string_to_offset
(
neighbor
)
directional_flux
=
fluxes
[
0
]
*
int
(
neighbor
[
0
])
for
i
in
range
(
1
,
self
.
dim
):
directional_flux
+=
fluxes
[
i
]
*
int
(
neighbor
[
i
])
discrete_flux
=
discretize
(
directional_flux
,
neighbor
)
discrete_fluxes
.
append
(
discrete_flux
)
if
flux_field
.
index_dimensions
>
1
:
return
[
ps
.
Assignment
(
lhs
,
rhs
)
for
i
,
d
in
enumerate
(
flux_field
.
staggered_stencil
)
if
discrete_fluxes
[
i
]
for
lhs
,
rhs
in
zip
(
flux_field
.
staggered_vector_access
(
d
),
sp
.
simplify
(
discrete_fluxes
[
i
]))]
else
:
return
[
ps
.
Assignment
(
flux_field
.
staggered_access
(
d
),
sp
.
simplify
(
discrete_fluxes
[
i
]))
for
i
,
d
in
enumerate
(
flux_field
.
staggered_stencil
)]
def
discrete_source
(
self
):
"""Return a list of assignments for the discrete source term"""
def
discretize
(
term
):
if
isinstance
(
term
,
ps
.
fd
.
Diff
):
direction1
=
term
.
args
[
1
]
if
isinstance
(
term
.
args
[
0
],
ps
.
Field
.
Access
):
# first derivative
access
=
term
.
args
[
0
]
direction
=
(
direction1
,)
elif
isinstance
(
term
.
args
[
0
],
ps
.
fd
.
Diff
):
# nested derivative
if
isinstance
(
term
.
args
[
0
].
args
[
0
],
ps
.
fd
.
Diff
):
# third or higher derivative
raise
ValueError
(
"can only handle first and second derivatives"
)
elif
not
isinstance
(
term
.
args
[
0
].
args
[
0
],
ps
.
Field
.
Access
):
raise
ValueError
(
"can only handle derivatives of field accesses"
)
access
,
direction2
=
term
.
args
[
0
].
args
[:
2
]
direction
=
(
direction1
,
direction2
)
else
:
raise
NotImplementedError
(
"can only deal with derivatives of field accesses, "
"but not {}; expansion of derivatives probably failed"
.
format
(
type
(
term
.
args
[
0
])))
if
self
.
dim
==
2
:
stencil
=
[
""
.
join
(
a
).
replace
(
" "
,
""
)
for
a
in
itertools
.
product
(
"NS "
,
"EW "
)
if
""
.
join
(
a
).
strip
()]
else
:
stencil
=
[
""
.
join
(
a
).
replace
(
" "
,
""
)
for
a
in
itertools
.
product
(
"NS "
,
"EW "
,
"TB "
)
if
""
.
join
(
a
).
strip
()]
stencil
=
[
tuple
(
ps
.
stencil
.
direction_string_to_offset
(
d
,
self
.
dim
))
for
d
in
stencil
]
derivation
=
FD
(
direction
,
stencil
).
get_stencil
()
if
not
derivation
.
accuracy
:
raise
Exception
(
'the requested derivative cannot be performed with the available neighbors'
)
weights
=
derivation
.
weights
# if the weights are underdefined, we can choose the free symbols to find the sparsest stencil
free_weights
=
set
(
itertools
.
chain
(
*
[
w
.
free_symbols
for
w
in
weights
]))
if
len
(
free_weights
)
>
0
:
zero_counts
=
defaultdict
(
list
)
for
values
in
itertools
.
product
([
-
1
,
-
sp
.
Rational
(
1
,
2
),
0
,
1
,
sp
.
Rational
(
1
,
2
)],
repeat
=
len
(
free_weights
)):
subs
=
{
free_weight
:
value
for
free_weight
,
value
in
zip
(
free_weights
,
values
)}
weights
=
[
w
.
subs
(
subs
)
for
w
in
derivation
.
weights
]
if
not
all
(
a
==
0
for
a
in
weights
):
zero_count
=
sum
([
1
for
w
in
weights
if
w
==
0
])
zero_counts
[
zero_count
].
append
(
weights
)
best
=
zero_counts
[
max
(
zero_counts
.
keys
())]
if
len
(
best
)
>
1
:
raise
NotImplementedError
(
"more than one suitable set of weights found, "
"don't know how to proceed"
)
weights
=
best
[
0
]
assert
weights
if
access
.
_field
.
index_dimensions
==
0
:
return
sum
([
access
.
_field
.
__getitem__
(
point
)
*
weight
for
point
,
weight
in
zip
(
stencil
,
weights
)])
else
:
total
=
access
.
get_shifted
(
*
stencil
[
0
]).
at_index
(
*
access
.
index
)
*
weights
[
0
]
for
point
,
weight
in
zip
(
stencil
[
1
:],
weights
[
1
:]):
addl
=
access
.
get_shifted
(
*
point
).
at_index
(
*
access
.
index
)
*
weight
total
+=
addl
return
total
if
term
.
args
:
new_args
=
[
discretize
(
a
)
for
a
in
term
.
args
]
return
term
.
func
(
*
new_args
)
else
:
return
term
source
=
self
.
q
.
applyfunc
(
ps
.
fd
.
derivative
.
expand_diff_full
)
source
=
source
.
applyfunc
(
discretize
)
return
[
ps
.
Assignment
(
lhs
,
rhs
)
for
lhs
,
rhs
in
zip
(
self
.
c
.
center_vector
,
sp
.
flatten
(
source
))
if
rhs
]
def
discrete_continuity
(
self
,
flux_field
:
ps
.
field
.
Field
):
"""Return a list of assignments for the continuity equation, which includes the source term
Args:
flux_field: a staggered field from which the fluxes are taken
"""
assert
ps
.
FieldType
.
is_staggered
(
flux_field
)
neighbors
=
flux_field
.
staggered_stencil
+
[
ps
.
stencil
.
inverse_direction_string
(
d
)
for
d
in
flux_field
.
staggered_stencil
]
divergence
=
flux_field
.
staggered_vector_access
(
neighbors
[
0
])
/
\
sp
.
Matrix
(
ps
.
stencil
.
direction_string_to_offset
(
neighbors
[
0
])).
norm
()
for
d
in
neighbors
[
1
:]:
divergence
+=
flux_field
.
staggered_vector_access
(
d
)
/
\
sp
.
Matrix
(
ps
.
stencil
.
direction_string_to_offset
(
d
)).
norm
()
A0
=
sum
([
sp
.
Matrix
(
ps
.
stencil
.
direction_string_to_offset
(
d
)).
norm
()
for
d
in
flux_field
.
staggered_stencil
])
/
self
.
dim
divergence
/=
A0
source
=
self
.
discrete_source
()
source
=
{
s
.
lhs
:
s
.
rhs
for
s
in
source
}
return
[
ps
.
Assignment
(
lhs
,
(
lhs
-
rhs
+
source
[
lhs
])
if
lhs
in
source
else
(
lhs
-
rhs
))
for
lhs
,
rhs
in
zip
(
self
.
c
.
center_vector
,
divergence
)]
def
VOF
(
j
:
ps
.
field
.
Field
,
v
:
ps
.
field
.
Field
,
ρ
:
ps
.
field
.
Field
):
"""Volume-of-fluid discretization of advection
Args:
j: the staggeredfield to write the fluxes to
v: the flow velocity field
ρ: the quantity to advect
"""
assert
ps
.
FieldType
.
is_staggered
(
j
)
fluxes
=
[[]
for
i
in
range
(
j
.
index_shape
[
0
])]
v0
=
v
.
center_vector
for
d
,
neighbor
in
enumerate
(
j
.
staggered_stencil
):
c
=
ps
.
stencil
.
direction_string_to_offset
(
neighbor
)
v1
=
v
.
neighbor_vector
(
c
)
# going out
cond
=
sp
.
And
(
*
[
sp
.
Or
(
c
[
i
]
*
v1
[
i
]
>
0
,
c
[
i
]
==
0
)
for
i
in
range
(
len
(
v0
))])
overlap1
=
[
1
-
sp
.
Abs
(
v0
[
i
])
for
i
in
range
(
len
(
v0
))]
overlap2
=
[
c
[
i
]
*
v0
[
i
]
for
i
in
range
(
len
(
v0
))]
overlap
=
sp
.
Mul
(
*
[(
overlap1
[
i
]
if
c
[
i
]
==
0
else
overlap2
[
i
])
for
i
in
range
(
len
(
v0
))])
fluxes
[
d
].
append
(
ρ
.
center_vector
*
overlap
*
sp
.
Piecewise
((
1
,
cond
),
(
0
,
True
)))
# coming in
cond
=
sp
.
And
(
*
[
sp
.
Or
(
c
[
i
]
*
v1
[
i
]
<
0
,
c
[
i
]
==
0
)
for
i
in
range
(
len
(
v1
))])
overlap1
=
[
1
-
sp
.
Abs
(
v1
[
i
])
for
i
in
range
(
len
(
v1
))]
overlap2
=
[
c
[
i
]
*
v1
[
i
]
for
i
in
range
(
len
(
v1
))]
overlap
=
sp
.
Mul
(
*
[(
overlap1
[
i
]
if
c
[
i
]
==
0
else
overlap2
[
i
])
for
i
in
range
(
len
(
v1
))])
fluxes
[
d
].
append
(
ρ
.
neighbor_vector
(
c
)
*
overlap
*
sp
.
Piecewise
((
1
,
cond
),
(
0
,
True
)))
for
i
,
ff
in
enumerate
(
fluxes
):
fluxes
[
i
]
=
ff
[
0
]
for
f
in
ff
:
fluxes
[
i
]
+=
f
assignments
=
[]
for
i
,
d
in
enumerate
(
j
.
staggered_stencil
):
for
lhs
,
rhs
in
zip
(
j
.
staggered_vector_access
(
d
).
values
(),
fluxes
[
i
].
values
()):
assignments
.
append
(
ps
.
Assignment
(
lhs
,
rhs
))
return
assignments
pystencils_tests/test_fvm.py
0 → 100644
View file @
1af235da
import
sympy
as
sp
import
pystencils
as
ps
def
test_ek
():
# parameters
L
=
(
40
,
40
)
D
=
sp
.
Symbol
(
"D"
)
z
=
sp
.
Symbol
(
"z"
)
# data structures
dh
=
ps
.
create_data_handling
(
L
,
periodicity
=
True
,
default_target
=
'cpu'
)
c
=
dh
.
add_array
(
'c'
,
values_per_cell
=
1
)
v
=
dh
.
add_array
(
'v'
,
values_per_cell
=
dh
.
dim
)
j
=
dh
.
add_array
(
'j'
,
values_per_cell
=
dh
.
dim
*
2
,
field_type
=
ps
.
FieldType
.
STAGGERED_FLUX
)
Phi
=
dh
.
add_array
(
'Φ'
,
values_per_cell
=
1
)
# perform automatic discretization
def
Gradient
(
f
):
return
sp
.
Matrix
([
ps
.
fd
.
diff
(
f
,
i
)
for
i
in
range
(
dh
.
dim
)])
flux_eq
=
-
D
*
Gradient
(
c
)
+
D
*
z
*
c
.
center
*
Gradient
(
Phi
)
disc
=
ps
.
fd
.
FVM1stOrder
(
c
,
flux_eq
)
flux_assignments
=
disc
.
discrete_flux
(
j
)
advection_assignments
=
ps
.
fd
.
VOF
(
j
,
v
,
c
)
continuity_assignments
=
disc
.
discrete_continuity
(
j
)
# manual discretization
x_staggered
=
-
c
[
-
1
,
0
]
+
c
[
0
,
0
]
+
z
*
(
c
[
-
1
,
0
]
+
c
[
0
,
0
])
/
2
*
(
Phi
[
-
1
,
0
]
-
Phi
[
0
,
0
])
y_staggered
=
-
c
[
0
,
-
1
]
+
c
[
0
,
0
]
+
z
*
(
c
[
0
,
-
1
]
+
c
[
0
,
0
])
/
2
*
(
Phi
[
0
,
-
1
]
-
Phi
[
0
,
0
])
xy_staggered
=
-
c
[
-
1
,
-
1
]
+
c
[
0
,
0
]
+
z
*
(
c
[
-
1
,
-
1
]
+
c
[
0
,
0
])
/
2
*
(
Phi
[
-
1
,
-
1
]
-
Phi
[
0
,
0
])
xY_staggered
=
-
c
[
-
1
,
1
]
+
c
[
0
,
0
]
+
z
*
(
c
[
-
1
,
1
]
+
c
[
0
,
0
])
/
2
*
(
Phi
[
-
1
,
1
]
-
Phi
[
0
,
0
])
jj
=
j
.
staggered_access
divergence
=
-
1
/
(
1
+
sp
.
sqrt
(
2
)
if
j
.
index_shape
[
0
]
==
4
else
1
)
*
\
sum
([
jj
(
d
)
/
sp
.
Matrix
(
ps
.
stencil
.
direction_string_to_offset
(
d
)).
norm
()
for
d
in
j
.
staggered_stencil
+
[
ps
.
stencil
.
inverse_direction_string
(
d
)
for
d
in
j
.
staggered_stencil
]])
update
=
[
ps
.
Assignment
(
c
.
center
,
c
.
center
+
divergence
)]
flux
=
[
ps
.
Assignment
(
j
.
staggered_access
(
"W"
),
D
*
x_staggered
),
ps
.
Assignment
(
j
.
staggered_access
(
"S"
),
D
*
y_staggered
)]
if
j
.
index_shape
[
0
]
==
4
:
flux
+=
[
ps
.
Assignment
(
j
.
staggered_access
(
"SW"
),
D
*
xy_staggered
),
ps
.
Assignment
(
j
.
staggered_access
(
"NW"
),
D
*
xY_staggered
)]
# compare
for
a
,
b
in
zip
(
flux
,
flux_assignments
):
assert
a
.
lhs
==
b
.
lhs
assert
sp
.
simplify
(
a
.
rhs
-
b
.
rhs
)
==
0
for
a
,
b
in
zip
(
update
,
continuity_assignments
):
assert
a
.
lhs
==
b
.
lhs
assert
a
.
rhs
==
b
.
rhs
# TODO: test advection and source
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment