Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 2108 additions and 34 deletions
import sympy as sp
import pystencils as ps
import numpy as np
import pytest
from itertools import product
from pystencils.rng import random_symbol
from pystencils.astnodes import SympyAssignment
from pystencils.node_collection import NodeCollection
def advection_diffusion(dim: int):
# parameters
if dim == 2:
L = (32, 32)
elif dim == 3:
L = (16, 16, 16)
dh = ps.create_data_handling(domain_size=L, periodicity=True, default_target=ps.Target.CPU)
n_field = dh.add_array('n', values_per_cell=1)
j_field = dh.add_array('j', values_per_cell=3 ** dim // 2, field_type=ps.FieldType.STAGGERED_FLUX)
velocity_field = dh.add_array('v', values_per_cell=dim)
D = 0.0666
time = 100
def grad(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dim)])
flux_eq = - D * grad(n_field)
fvm_eq = ps.fd.FVM1stOrder(n_field, flux=flux_eq)
vof_adv = ps.fd.VOF(j_field, velocity_field, n_field)
# merge calculation of advection and diffusion terms
flux = []
for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_field)):
assert adv.lhs == div.lhs
flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs))
flux_kernel = ps.create_staggered_kernel(flux).compile()
pde_kernel = ps.create_kernel(fvm_eq.discrete_continuity(j_field)).compile()
sync_conc = dh.synchronization_function([n_field.name])
# analytical density calculation
def density(pos: np.ndarray, time: int, D: float):
return (4 * np.pi * D * time)**(-dim / 2) * \
np.exp(-np.sum(np.square(pos), axis=-1) / (4 * D * time))
pos = np.zeros((*L, dim))
xpos = np.arange(-L[0] // 2, L[0] // 2)
ypos = np.arange(-L[1] // 2, L[1] // 2)
if dim == 2:
pos[..., 1], pos[..., 0] = np.meshgrid(xpos, ypos)
elif dim == 3:
zpos = np.arange(-L[2] // 2, L[2] // 2)
pos[..., 2], pos[..., 1], pos[..., 0] = np.meshgrid(xpos, ypos, zpos)
pos += 0.5
def run(velocity: np.ndarray, time: int):
dh.fill(n_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(j_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
# set initial values for velocity and density
for i in range(dim):
dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True)
dh.fill(n_field.name, 0)
if dim == 2:
start = ps.make_slice[L[0] // 2 - 1:L[0] // 2 + 1, L[1] // 2 - 1:L[1] // 2 + 1]
else:
start = ps.make_slice[L[0] // 2 - 1:L[0] // 2 + 1, L[1] // 2 - 1:L[1] // 2 + 1,
L[2] // 2 - 1:L[2] // 2 + 1]
dh.fill(n_field.name, 2**-dim, slice_obj=start)
sync_conc()
for i in range(time):
dh.run_kernel(flux_kernel)
dh.run_kernel(pde_kernel)
sync_conc()
sim_density = dh.gather_array(n_field.name)
# check that mass was conserved
assert np.isclose(sim_density.sum(), 1)
assert np.all(sim_density > 0)
# check that the maximum is in the right place
peak = np.unravel_index(np.argmax(sim_density, axis=None), sim_density.shape)
assert np.allclose(peak, np.array(L) // 2 - 0.5 + velocity * time, atol=0.5)
# check the concentration profile
if np.linalg.norm(velocity) == 0:
calc_density = density(pos - velocity * time, time, D)
target = [time, D]
pytest.importorskip('scipy.optimize')
from scipy.optimize import curve_fit
popt, _ = curve_fit(lambda x, t, D: density(x - velocity * time, t, D),
pos.reshape(-1, dim),
sim_density.reshape(-1),
p0=target)
assert np.isclose(popt[0], time, rtol=0.1)
assert np.isclose(popt[1], D, rtol=0.1)
assert np.allclose(calc_density, sim_density, atol=1e-4)
return lambda v: run(np.array(v), time)
advection_diffusion.runners = {}
@pytest.mark.parametrize("velocity", list(product([0, -0.047, 0.041], [0, -0.031, 0.023])))
def test_advection_diffusion_2d(velocity):
if 2 not in advection_diffusion.runners:
advection_diffusion.runners[2] = advection_diffusion(2)
advection_diffusion.runners[2](velocity)
@pytest.mark.parametrize("velocity", list(product([0, -0.047, 0.041], [0, -0.031, 0.023], [0, -0.017, 0.011])))
@pytest.mark.longrun
def test_advection_diffusion_3d(velocity):
if 3 not in advection_diffusion.runners:
advection_diffusion.runners[3] = advection_diffusion(3)
advection_diffusion.runners[3](velocity)
def advection_diffusion_fluctuations(dim: int):
# parameters
if dim == 2:
L = (32, 32)
stencil_factor = np.sqrt(1 / (1 + np.sqrt(2)))
elif dim == 3:
L = (16, 16, 16)
stencil_factor = np.sqrt(1 / (1 + 2 * np.sqrt(2) + 4.0 / 3.0 * np.sqrt(3)))
dh = ps.create_data_handling(domain_size=L, periodicity=True, default_target=ps.Target.CPU)
n_field = dh.add_array('n', values_per_cell=1)
j_field = dh.add_array('j', values_per_cell=3 ** dim // 2, field_type=ps.FieldType.STAGGERED_FLUX)
velocity_field = dh.add_array('v', values_per_cell=dim)
D = 0.00666
time = 10000
def grad(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dim)])
flux_eq = - D * grad(n_field)
fvm_eq = ps.fd.FVM1stOrder(n_field, flux=flux_eq)
vof_adv = ps.fd.VOF(j_field, velocity_field, n_field)
# merge calculation of advection and diffusion terms
flux = []
for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_field)):
assert adv.lhs == div.lhs
flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs))
flux = ps.AssignmentCollection(flux)
rng_symbol_gen = random_symbol(flux.subexpressions, dim=dh.dim)
for i in range(len(flux.main_assignments)):
n = j_field.staggered_stencil[i]
assert flux.main_assignments[i].lhs == j_field.staggered_access(n)
# calculate mean density
dens = (n_field.neighbor_vector(n) + n_field.center_vector)[0] / 2
# multyply by smoothed haviside function so that fluctuation will not get bigger that the density
dens *= sp.Max(0, sp.Min(1.0, n_field.neighbor_vector(n)[0]) * sp.Min(1.0, n_field.center_vector[0]))
# lenght of the vector
length = sp.sqrt(len(j_field.staggered_stencil[i]))
# amplitude of the random fluctuations
fluct = sp.sqrt(2 * dens * D) * sp.sqrt(1 / length) * stencil_factor
# add fluctuations
fluct *= 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3)
flux.main_assignments[i] = ps.Assignment(flux.main_assignments[i].lhs, flux.main_assignments[i].rhs + fluct)
# Add the folding to the flux, so that the random numbers persist through the ghostlayers.
fold = {ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i):
ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i) % L[i] for i in range(len(L))}
flux.subs(fold)
flux_kernel = ps.create_staggered_kernel(flux).compile()
pde_kernel = ps.create_kernel(fvm_eq.discrete_continuity(j_field)).compile()
sync_conc = dh.synchronization_function([n_field.name])
# analytical density distribution calculation
def P(rho, density_init):
res = []
for r in rho:
res.append(np.power(density_init, r) * np.exp(-density_init) / np.math.gamma(r + 1))
return np.array(res)
def run(density_init: float, velocity: np.ndarray, time: int):
dh.fill(n_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(j_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
# set initial values for velocity and density
for i in range(dim):
dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True)
dh.fill(n_field.name, density_init)
measurement_intervall = 10
warm_up = 1000
data = []
sync_conc()
for i in range(warm_up):
dh.run_kernel(flux_kernel, seed=42, time_step=i)
dh.run_kernel(pde_kernel)
sync_conc()
for i in range(time):
dh.run_kernel(flux_kernel, seed=42, time_step=i + warm_up)
dh.run_kernel(pde_kernel)
sync_conc()
if(i % measurement_intervall == 0):
data = np.append(data, dh.gather_array(n_field.name).ravel(), 0)
# test mass conservation
np.testing.assert_almost_equal(dh.gather_array(n_field.name).mean(), density_init)
n_bins = 50
density_value, bins = np.histogram(data, density=True, bins=n_bins)
bins_mean = bins[:-1] + (bins[1:] - bins[:-1]) / 2
analytical_value = P(bins_mean, density_init)
print(density_value - analytical_value)
np.testing.assert_allclose(density_value, analytical_value, atol=2e-3)
return lambda density_init, v: run(density_init, np.array(v), time)
advection_diffusion_fluctuations.runners = {}
@pytest.mark.parametrize("velocity", list(product([0, 0.00041], [0, -0.00031])))
@pytest.mark.parametrize("density", [27.0, 56.5])
@pytest.mark.longrun
def test_advection_diffusion_fluctuation_2d(density, velocity):
if 2 not in advection_diffusion_fluctuations.runners:
advection_diffusion_fluctuations.runners[2] = advection_diffusion_fluctuations(2)
advection_diffusion_fluctuations.runners[2](density, velocity)
@pytest.mark.parametrize("velocity", [(0.0, 0.0, 0.0), (0.00043, -0.00017, 0.00028)])
@pytest.mark.parametrize("density", [27.0, 56.5])
@pytest.mark.longrun
def test_advection_diffusion_fluctuation_3d(density, velocity):
if 3 not in advection_diffusion_fluctuations.runners:
advection_diffusion_fluctuations.runners[3] = advection_diffusion_fluctuations(3)
advection_diffusion_fluctuations.runners[3](density, velocity)
def diffusion_reaction(fluctuations: bool):
# parameters
L = (32, 32)
stencil_factor = np.sqrt(1 / (1 + np.sqrt(2)))
dh = ps.create_data_handling(domain_size=L, periodicity=True, default_target=ps.Target.CPU)
species = 2
n_fields = []
j_fields = []
r_flux_fields = []
for i in range(species):
n_fields.append(dh.add_array(f'n_{i}', values_per_cell=1))
j_fields.append(dh.add_array(f'j_{i}', values_per_cell=3 ** dh.dim // 2,
field_type=ps.FieldType.STAGGERED_FLUX))
r_flux_fields.append(dh.add_array(f'r_{i}', values_per_cell=1))
velocity_field = dh.add_array('v', values_per_cell=dh.dim)
D = 0.00666
time = 1000
r_order = [2.0, 0.0]
r_rate_const = 0.00001
r_coefs = [-2, 1]
def grad(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dh.dim)])
flux_eq = - D * grad(n_fields[0])
fvm_eq = ps.fd.FVM1stOrder(n_fields[0], flux=flux_eq)
vof_adv = ps.fd.VOF(j_fields[0], velocity_field, n_fields[0])
continuity_assignments = fvm_eq.discrete_continuity(j_fields[0])
# merge calculation of advection and diffusion terms
flux = []
for adv, div in zip(vof_adv, fvm_eq.discrete_flux(j_fields[0])):
assert adv.lhs == div.lhs
flux.append(ps.Assignment(adv.lhs, adv.rhs + div.rhs))
flux = ps.AssignmentCollection(flux)
if(fluctuations):
rng_symbol_gen = random_symbol(flux.subexpressions, dim=dh.dim)
for i in range(len(flux.main_assignments)):
n = j_fields[0].staggered_stencil[i]
assert flux.main_assignments[i].lhs == j_fields[0].staggered_access(n)
# calculate mean density
dens = (n_fields[0].neighbor_vector(n) + n_fields[0].center_vector)[0] / 2
# multyply by smoothed haviside function so that fluctuation will not get bigger that the density
dens *= sp.Max(0,
sp.Min(1.0, n_fields[0].neighbor_vector(n)[0]) * sp.Min(1.0, n_fields[0].center_vector[0]))
# lenght of the vector
length = sp.sqrt(len(j_fields[0].staggered_stencil[i]))
# amplitude of the random fluctuations
fluct = sp.sqrt(2 * dens * D) * sp.sqrt(1 / length) * stencil_factor
# add fluctuations
fluct *= 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3)
flux.main_assignments[i] = ps.Assignment(flux.main_assignments[i].lhs, flux.main_assignments[i].rhs + fluct)
# Add the folding to the flux, so that the random numbers persist through the ghostlayers.
fold = {ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i):
ps.astnodes.LoopOverCoordinate.get_loop_counter_symbol(i) % L[i] for i in range(len(L))}
flux.subs(fold)
r_flux = NodeCollection([SympyAssignment(j_fields[i].center, 0) for i in range(species)])
reaction = r_rate_const
for i in range(species):
reaction *= sp.Pow(n_fields[i].center, r_order[i])
new_assignments = []
if fluctuations:
rng_symbol_gen = random_symbol(new_assignments, dim=dh.dim)
reaction_fluctuations = sp.sqrt(sp.Abs(reaction)) * 2 * (next(rng_symbol_gen) - 0.5) * sp.sqrt(3)
reaction_fluctuations *= sp.Min(1, sp.Abs(reaction**2))
else:
reaction_fluctuations = 0.0
for i in range(species):
r_flux.all_assignments[i] = SympyAssignment(
r_flux_fields[i].center, (reaction + reaction_fluctuations) * r_coefs[i])
[r_flux.all_assignments.insert(0, new) for new in new_assignments]
continuity_assignments = [SympyAssignment(*assignment.args) for assignment in continuity_assignments]
continuity_assignments.append(SympyAssignment(n_fields[0].center, n_fields[0].center + r_flux_fields[0].center))
flux_kernel = ps.create_staggered_kernel(flux).compile()
reaction_kernel = ps.create_kernel(r_flux).compile()
config = ps.CreateKernelConfig(allow_double_writes=True)
pde_kernel = ps.create_kernel(continuity_assignments, config=config).compile()
sync_conc = dh.synchronization_function([n_fields[0].name, n_fields[1].name])
def f(t, r, n0, fac, fluctuations):
"""Calculates the amount of product created after a certain time of a reaction with form xA -> B
Args:
t: Time of the reation
r: Reaction rate constant
n0: Initial density of the
fac: Reaction order of A (this in most cases equals the stochometric coefficient x)
fluctuations: Boolian whether fluctuations were included during the reaction.
"""
if fluctuations:
return 1 / fac * (n0 + n0 / (n0 - (n0 + 1) * np.exp(fac * r * t)))
return 1 / fac * (n0 - (1 / (fac * r * t + (1 / n0))))
def run(density_init: float, velocity: np.ndarray, time: int):
for i in range(species):
dh.fill(n_fields[i].name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(j_fields[i].name, 0.0, ghost_layers=True, inner_ghost_layers=True)
dh.fill(r_flux_fields[i].name, 0.0, ghost_layers=True, inner_ghost_layers=True)
# set initial values for velocity and density
for i in range(dh.dim):
dh.fill(velocity_field.name, velocity[i], i, ghost_layers=True, inner_ghost_layers=True)
dh.fill(n_fields[0].name, density_init)
dh.fill(n_fields[1].name, 0.0)
measurement_intervall = 10
data = []
sync_conc()
for i in range(time):
if(i % measurement_intervall == 0):
data.append([i, dh.gather_array(n_fields[1].name).mean(), dh.gather_array(n_fields[0].name).mean()])
dh.run_kernel(reaction_kernel, seed=41, time_step=i)
for s_idx in range(species):
flux_kernel(n_0=dh.cpu_arrays[n_fields[s_idx].name],
j_0=dh.cpu_arrays[j_fields[s_idx].name],
v=dh.cpu_arrays[velocity_field.name], seed=42 + s_idx, time_step=i)
pde_kernel(n_0=dh.cpu_arrays[n_fields[s_idx].name],
j_0=dh.cpu_arrays[j_fields[s_idx].name],
r_0=dh.cpu_arrays[r_flux_fields[s_idx].name])
sync_conc()
data = np.array(data).transpose()
x = data[0]
analytical_value = f(x, r_rate_const, density_init, abs(r_coefs[0]), fluctuations)
# test mass conservation
np.testing.assert_almost_equal(
dh.gather_array(n_fields[0].name).mean() + 2 * dh.gather_array(n_fields[1].name).mean(), density_init)
r_tol = 2e-3
if fluctuations:
r_tol = 3e-2
np.testing.assert_allclose(data[1], analytical_value, rtol=r_tol)
return lambda density_init, v: run(density_init, np.array(v), time)
advection_diffusion_fluctuations.runners = {}
@pytest.mark.parametrize("velocity", list(product([0, 0.0041], [0, -0.0031])))
@pytest.mark.parametrize("density", [27.0, 56.5])
@pytest.mark.parametrize("fluctuations", [False, True])
@pytest.mark.longrun
def test_diffusion_reaction(fluctuations, density, velocity):
diffusion_reaction.runner = diffusion_reaction(fluctuations)
diffusion_reaction.runner(density, velocity)
def VOF2(j: ps.field.Field, v: ps.field.Field, ρ: ps.field.Field, simplify=True):
"""Volume-of-fluid discretization of advection
Args:
j: the staggered field to write the fluxes to. Should have a D2Q9/D3Q27 stencil. Other stencils work too, but
incur a small error (D2Q5/D3Q7: v^2, D3Q19: v^3).
v: the flow velocity field
ρ: the quantity to advect
simplify: whether to simplify the generated expressions (slow, but makes them much more readable and faster)
"""
dim = j.spatial_dimensions
assert ps.FieldType.is_staggered(j)
def assume_velocity(e):
if not simplify:
return e
repl = {}
for c in e.atoms(sp.StrictGreaterThan, sp.GreaterThan):
if isinstance(c.lhs, ps.field.Field.Access) and c.lhs.field == v and isinstance(c.rhs, sp.Number):
if c.rhs <= -1:
repl[c] = True
elif c.rhs >= 1:
repl[c] = False
for c in e.atoms(sp.StrictLessThan, sp.LessThan):
if isinstance(c.lhs, ps.field.Field.Access) and c.lhs.field == v and isinstance(c.rhs, sp.Number):
if c.rhs >= 1:
repl[c] = True
elif c.rhs <= -1:
repl[c] = False
for c in e.atoms(sp.Equality):
if isinstance(c.lhs, ps.field.Field.Access) and c.lhs.field == v and isinstance(c.rhs, sp.Number):
if c.rhs <= -1 or c.rhs >= 1:
repl[c] = False
return e.subs(repl)
class AABB:
def __init__(self, corner0, corner1):
self.dim = len(corner0)
self.minCorner = sp.zeros(self.dim, 1)
self.maxCorner = sp.zeros(self.dim, 1)
for i in range(self.dim):
self.minCorner[i] = sp.Piecewise((corner0[i], corner0[i] < corner1[i]), (corner1[i], True))
self.maxCorner[i] = sp.Piecewise((corner1[i], corner0[i] < corner1[i]), (corner0[i], True))
def intersect(self, other):
minCorner = [sp.Max(self.minCorner[d], other.minCorner[d]) for d in range(self.dim)]
maxCorner = [sp.Max(minCorner[d], sp.Min(self.maxCorner[d], other.maxCorner[d]))
for d in range(self.dim)]
return AABB(minCorner, maxCorner)
@property
def volume(self):
v = sp.prod([self.maxCorner[d] - self.minCorner[d] for d in range(self.dim)])
if simplify:
return sp.simplify(assume_velocity(v.rewrite(sp.Piecewise)))
else:
return v
fluxes = []
cell = AABB([-0.5] * dim, [0.5] * dim)
cell_s = AABB(sp.Matrix([-0.5] * dim) + v.center_vector, sp.Matrix([0.5] * dim) + v.center_vector)
for d, neighbor in enumerate(j.staggered_stencil):
c = sp.Matrix(ps.stencil.direction_string_to_offset(neighbor)[:dim])
cell_n = AABB(sp.Matrix([-0.5] * dim) + c, sp.Matrix([0.5] * dim) + c)
cell_ns = AABB(sp.Matrix([-0.5] * dim) + c + v.neighbor_vector(neighbor),
sp.Matrix([0.5] * dim) + c + v.neighbor_vector(neighbor))
fluxes.append(assume_velocity(ρ.center_vector * cell_s.intersect(cell_n).volume
- ρ.neighbor_vector(neighbor) * cell_ns.intersect(cell).volume))
assignments = []
for i, d in enumerate(j.staggered_stencil):
for lhs, rhs in zip(j.staggered_vector_access(d).values(), fluxes[i].values()):
assignments.append(ps.Assignment(lhs, rhs))
return assignments
@pytest.mark.parametrize("dim", [2, 3])
def test_advection(dim):
L = (8,) * dim
dh = ps.create_data_handling(L, periodicity=True, default_target=ps.Target.CPU)
c = dh.add_array('c', values_per_cell=1)
j = dh.add_array('j', values_per_cell=3 ** dh.dim // 2, field_type=ps.FieldType.STAGGERED_FLUX)
u = dh.add_array('u', values_per_cell=dh.dim)
dh.cpu_arrays[c.name][:] = (np.random.random([l + 2 for l in L]))
dh.cpu_arrays[u.name][:] = (np.random.random([l + 2 for l in L] + [dim]) - 0.5) / 5
vof1 = ps.create_kernel(ps.fd.VOF(j, u, c)).compile()
dh.fill(j.name, np.nan, ghost_layers=True)
dh.run_kernel(vof1)
j1 = dh.gather_array(j.name).copy()
vof2 = ps.create_kernel(VOF2(j, u, c, simplify=False)).compile()
dh.fill(j.name, np.nan, ghost_layers=True)
dh.run_kernel(vof2)
j2 = dh.gather_array(j.name)
assert np.allclose(j1, j2)
@pytest.mark.parametrize("stencil", ["D2Q5", "D2Q9"])
def test_ek(stencil):
# parameters
L = (40, 40)
D = sp.Symbol("D")
z = sp.Symbol("z")
# data structures
dh = ps.create_data_handling(L, periodicity=True, default_target=ps.Target.CPU)
c = dh.add_array('c', values_per_cell=1)
j = dh.add_array('j', values_per_cell=int(stencil[-1]) // 2, field_type=ps.FieldType.STAGGERED_FLUX)
Phi = dh.add_array('Φ', values_per_cell=1)
# perform automatic discretization
def Gradient(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dh.dim)])
flux_eq = -D * Gradient(c) + D * z * c.center * Gradient(Phi)
disc = ps.fd.FVM1stOrder(c, flux_eq)
flux_assignments = disc.discrete_flux(j)
continuity_assignments = disc.discrete_continuity(j)
# manual discretization
x_staggered = - c[-1, 0] + c[0, 0] + z * (c[-1, 0] + c[0, 0]) / 2 * (Phi[-1, 0] - Phi[0, 0])
y_staggered = - c[0, -1] + c[0, 0] + z * (c[0, -1] + c[0, 0]) / 2 * (Phi[0, -1] - Phi[0, 0])
xy_staggered = (- c[-1, -1] + c[0, 0]) / sp.sqrt(2) + \
z * (c[-1, -1] + c[0, 0]) / 2 * (Phi[-1, -1] - Phi[0, 0]) / sp.sqrt(2)
xY_staggered = (- c[-1, 1] + c[0, 0]) / sp.sqrt(2) + \
z * (c[-1, 1] + c[0, 0]) / 2 * (Phi[-1, 1] - Phi[0, 0]) / sp.sqrt(2)
A0 = (1 + sp.sqrt(2) if j.index_shape[0] == 4 else 1)
jj = j.staggered_access
divergence = -1 * sum([jj(d) for d in j.staggered_stencil
+ [ps.stencil.inverse_direction_string(d) for d in j.staggered_stencil]])
update = [ps.Assignment(c.center, c.center + divergence)]
flux = [ps.Assignment(j.staggered_access("W"), D * x_staggered / A0),
ps.Assignment(j.staggered_access("S"), D * y_staggered / A0)]
if j.index_shape[0] == 4:
flux += [ps.Assignment(j.staggered_access("SW"), D * xy_staggered / A0),
ps.Assignment(j.staggered_access("NW"), D * xY_staggered / A0)]
# compare
for a, b in zip(flux, flux_assignments):
assert a.lhs == b.lhs
assert sp.simplify(a.rhs - b.rhs) == 0
for a, b in zip(update, continuity_assignments):
assert a.lhs == b.lhs
assert a.rhs == b.rhs
# TODO: test source
@pytest.mark.parametrize("stencil", ["D2Q5", "D2Q9", "D3Q7", "D3Q19", "D3Q27"])
@pytest.mark.parametrize("derivative", [0, 1])
def test_flux_stencil(stencil, derivative):
L = (40, ) * int(stencil[1])
dh = ps.create_data_handling(L, periodicity=True, default_target=ps.Target.CPU)
c = dh.add_array('c', values_per_cell=1)
j = dh.add_array('j', values_per_cell=int(stencil[3:]) // 2, field_type=ps.FieldType.STAGGERED_FLUX)
def Gradient(f):
return sp.Matrix([ps.fd.diff(f, i) for i in range(dh.dim)])
eq = [sp.Matrix([sp.Symbol(f"a_{i}") * c.center for i in range(dh.dim)]), Gradient(c)][derivative]
disc = ps.fd.FVM1stOrder(c, flux=eq)
# check the continuity
continuity_assignments = disc.discrete_continuity(j)
assert [len(a.rhs.atoms(ps.field.Field.Access)) for a in continuity_assignments] == \
[int(stencil[3:])] * len(continuity_assignments)
# check the flux
flux_assignments = disc.discrete_flux(j)
assert [len(a.rhs.atoms(ps.field.Field.Access)) for a in flux_assignments] == [2] * len(flux_assignments)
@pytest.mark.parametrize("stencil", ["D2Q5", "D2Q9", "D3Q7", "D3Q19", "D3Q27"])
def test_source_stencil(stencil):
L = (40, ) * int(stencil[1])
dh = ps.create_data_handling(L, periodicity=True, default_target=ps.Target.CPU)
c = dh.add_array('c', values_per_cell=1)
j = dh.add_array('j', values_per_cell=int(stencil[3:]) // 2, field_type=ps.FieldType.STAGGERED_FLUX)
continuity_ref = ps.fd.FVM1stOrder(c).discrete_continuity(j)
for eq in [c.center] + [ps.fd.diff(c, i) for i in range(dh.dim)]:
disc = ps.fd.FVM1stOrder(c, source=eq)
diff = sp.simplify(disc.discrete_continuity(j)[0].rhs - continuity_ref[0].rhs)
if type(eq) is ps.field.Field.Access:
assert len(diff.atoms(ps.field.Field.Access)) == 1
else:
assert len(diff.atoms(ps.field.Field.Access)) == 2
def test_fvm_staggered_simplification():
D = sp.Symbol("D")
data_type = "float64"
c = ps.fields(f"c: {data_type}[2D]", layout='fzyx')
j = ps.fields(f"j(2): {data_type}[2D]", layout='fzyx', field_type=ps.FieldType.STAGGERED_FLUX)
grad_c = sp.Matrix([ps.fd.diff(c, i) for i in range(c.spatial_dimensions)])
ek = ps.fd.FVM1stOrder(c, flux=-D * grad_c)
ast = ps.create_staggered_kernel(ek.discrete_flux(j))
code = ps.get_code_str(ast)
assert '_size_c_0 - 1 < _size_c_0 - 1' not in code
import sympy
import pystencils.astnodes
from pystencils.backends.cbackend import CBackend
from pystencils.typing import TypedSymbol
class BogusDeclaration(pystencils.astnodes.Node):
"""Base class for all AST nodes."""
def __init__(self, parent=None):
self.parent = parent
@property
def args(self):
"""Returns all arguments/children of this node."""
return set()
@property
def symbols_defined(self):
"""Set of symbols which are defined by this node."""
return {TypedSymbol('Foo', 'double')}
@property
def undefined_symbols(self):
"""Symbols which are used but are not defined inside this node."""
set()
def subs(self, subs_dict):
"""Inplace! substitute, similar to sympy's but modifies the AST inplace."""
for a in self.args:
a.subs(subs_dict)
@property
def func(self):
return self.__class__
def atoms(self, arg_type):
"""Returns a set of all descendants recursively, which are an instance of the given type."""
result = set()
for arg in self.args:
if isinstance(arg, arg_type):
result.add(arg)
result.update(arg.atoms(arg_type))
return result
class BogusUsage(pystencils.astnodes.Node):
"""Base class for all AST nodes."""
def __init__(self, requires_global: bool, parent=None):
self.parent = parent
if requires_global:
self.required_global_declarations = [BogusDeclaration()]
@property
def args(self):
"""Returns all arguments/children of this node."""
return set()
@property
def symbols_defined(self):
"""Set of symbols which are defined by this node."""
return set()
@property
def undefined_symbols(self):
"""Symbols which are used but are not defined inside this node."""
return {TypedSymbol('Foo', 'double')}
def subs(self, subs_dict):
"""Inplace! substitute, similar to sympy's but modifies the AST inplace."""
for a in self.args:
a.subs(subs_dict)
@property
def func(self):
return self.__class__
def atoms(self, arg_type):
"""Returns a set of all descendants recursively, which are an instance of the given type."""
result = set()
for arg in self.args:
if isinstance(arg, arg_type):
result.add(arg)
result.update(arg.atoms(arg_type))
return result
def test_global_definitions_with_global_symbol():
# Teach our printer to print new ast nodes
CBackend._print_BogusUsage = lambda _, __: "// Bogus would go here"
CBackend._print_BogusDeclaration = lambda _, __: "// Declaration would go here"
z, x, y = pystencils.fields("z, y, x: [2d]")
normal_assignments = pystencils.AssignmentCollection([pystencils.Assignment(
z[0, 0], x[0, 0] * x[0, 0] * y[0, 0])], [])
ast = pystencils.create_kernel(normal_assignments)
print(pystencils.show_code(ast))
ast.body.append(BogusUsage(requires_global=True))
print(pystencils.show_code(ast))
kernel = ast.compile()
assert kernel is not None
assert TypedSymbol('Foo', 'double') not in [p.symbol for p in ast.get_parameters()]
def test_global_definitions_without_global_symbol():
# Teach our printer to print new ast nodes
CBackend._print_BogusUsage = lambda _, __: "// Bogus would go here"
CBackend._print_BogusDeclaration = lambda _, __: "// Declaration would go here"
z, x, y = pystencils.fields("z, y, x: [2d]")
normal_assignments = pystencils.AssignmentCollection([pystencils.Assignment(
z[0, 0], x[0, 0] * x[0, 0] * y[0, 0])], [])
ast = pystencils.create_kernel(normal_assignments)
print(pystencils.show_code(ast))
ast.body.append(BogusUsage(requires_global=False))
print(pystencils.show_code(ast))
kernel = ast.compile()
assert kernel is not None
assert TypedSymbol('Foo', 'double') in [p.symbol for p in ast.get_parameters()]
import pytest
import numpy as np import numpy as np
import sympy as sp import sympy as sp
from pystencils import Field, Assignment, fields import math
from pystencils.simp import sympy_cse_on_assignment_list
from pystencils.gpucuda.indexing import LineIndexing
from pystencils.slicing import remove_ghost_layers, add_ghost_layers, make_slice
from pystencils.gpucuda import make_python_function, create_cuda_kernel, BlockIndexing
import pycuda.gpuarray as gpuarray
from scipy.ndimage import convolve from scipy.ndimage import convolve
from pystencils import Assignment, Field, fields, CreateKernelConfig, create_kernel, Target, get_code_str
from pystencils.gpu import BlockIndexing
from pystencils.simp import sympy_cse_on_assignment_list
from pystencils.slicing import add_ghost_layers, make_slice, remove_ghost_layers, normalize_slice
try:
import cupy as cp
device_numbers = range(cp.cuda.runtime.getDeviceCount())
except ImportError:
device_numbers = []
cp = None
def test_averaging_kernel(): def test_averaging_kernel():
pytest.importorskip('cupy')
size = (40, 55) size = (40, 55)
src_arr = np.random.rand(*size) src_arr = np.random.rand(*size)
src_arr = add_ghost_layers(src_arr) src_arr = add_ghost_layers(src_arr)
...@@ -20,13 +30,14 @@ def test_averaging_kernel(): ...@@ -20,13 +30,14 @@ def test_averaging_kernel():
update_rule = Assignment(dst_field[0, 0], update_rule = Assignment(dst_field[0, 0],
(src_field[0, 1] + src_field[0, -1] + src_field[1, 0] + src_field[-1, 0]) / 4) (src_field[0, 1] + src_field[0, -1] + src_field[1, 0] + src_field[-1, 0]) / 4)
ast = create_cuda_kernel(sympy_cse_on_assignment_list([update_rule])) config = CreateKernelConfig(target=Target.GPU)
kernel = make_python_function(ast) ast = create_kernel(sympy_cse_on_assignment_list([update_rule]), config=config)
kernel = ast.compile()
gpu_src_arr = gpuarray.to_gpu(src_arr) gpu_src_arr = cp.asarray(src_arr)
gpu_dst_arr = gpuarray.to_gpu(dst_arr) gpu_dst_arr = cp.asarray(dst_arr)
kernel(src=gpu_src_arr, dst=gpu_dst_arr) kernel(src=gpu_src_arr, dst=gpu_dst_arr)
gpu_dst_arr.get(dst_arr) dst_arr = gpu_dst_arr.get()
stencil = np.array([[0, 1, 0], [1, 0, 1], [0, 1, 0]]) / 4.0 stencil = np.array([[0, 1, 0], [1, 0, 1], [0, 1, 0]]) / 4.0
reference = convolve(remove_ghost_layers(src_arr), stencil, mode='constant', cval=0.0) reference = convolve(remove_ghost_layers(src_arr), stencil, mode='constant', cval=0.0)
...@@ -35,24 +46,26 @@ def test_averaging_kernel(): ...@@ -35,24 +46,26 @@ def test_averaging_kernel():
def test_variable_sized_fields(): def test_variable_sized_fields():
pytest.importorskip('cupy')
src_field = Field.create_generic('src', spatial_dimensions=2) src_field = Field.create_generic('src', spatial_dimensions=2)
dst_field = Field.create_generic('dst', spatial_dimensions=2) dst_field = Field.create_generic('dst', spatial_dimensions=2)
update_rule = Assignment(dst_field[0, 0], update_rule = Assignment(dst_field[0, 0],
(src_field[0, 1] + src_field[0, -1] + src_field[1, 0] + src_field[-1, 0]) / 4) (src_field[0, 1] + src_field[0, -1] + src_field[1, 0] + src_field[-1, 0]) / 4)
ast = create_cuda_kernel(sympy_cse_on_assignment_list([update_rule])) config = CreateKernelConfig(target=Target.GPU)
kernel = make_python_function(ast) ast = create_kernel(sympy_cse_on_assignment_list([update_rule]), config=config)
kernel = ast.compile()
size = (3, 3) size = (3, 3)
src_arr = np.random.rand(*size) src_arr = np.random.rand(*size)
src_arr = add_ghost_layers(src_arr) src_arr = add_ghost_layers(src_arr)
dst_arr = np.zeros_like(src_arr) dst_arr = np.zeros_like(src_arr)
gpu_src_arr = gpuarray.to_gpu(src_arr) gpu_src_arr = cp.asarray(src_arr)
gpu_dst_arr = gpuarray.to_gpu(dst_arr) gpu_dst_arr = cp.asarray(dst_arr)
kernel(src=gpu_src_arr, dst=gpu_dst_arr) kernel(src=gpu_src_arr, dst=gpu_dst_arr)
gpu_dst_arr.get(dst_arr) dst_arr = gpu_dst_arr.get()
stencil = np.array([[0, 1, 0], [1, 0, 1], [0, 1, 0]]) / 4.0 stencil = np.array([[0, 1, 0], [1, 0, 1], [0, 1, 0]]) / 4.0
reference = convolve(remove_ghost_layers(src_arr), stencil, mode='constant', cval=0.0) reference = convolve(remove_ghost_layers(src_arr), stencil, mode='constant', cval=0.0)
...@@ -61,6 +74,7 @@ def test_variable_sized_fields(): ...@@ -61,6 +74,7 @@ def test_variable_sized_fields():
def test_multiple_index_dimensions(): def test_multiple_index_dimensions():
pytest.importorskip('cupy')
"""Sums along the last axis of a numpy array""" """Sums along the last axis of a numpy array"""
src_size = (7, 6, 4) src_size = (7, 6, 4)
dst_size = src_size[:2] dst_size = src_size[:2]
...@@ -74,13 +88,14 @@ def test_multiple_index_dimensions(): ...@@ -74,13 +88,14 @@ def test_multiple_index_dimensions():
update_rule = Assignment(dst_field[0, 0], update_rule = Assignment(dst_field[0, 0],
sum([src_field[offset[0], offset[1]](i) for i in range(src_size[-1])])) sum([src_field[offset[0], offset[1]](i) for i in range(src_size[-1])]))
ast = create_cuda_kernel([update_rule]) config = CreateKernelConfig(target=Target.GPU)
kernel = make_python_function(ast) ast = create_kernel([update_rule], config=config)
kernel = ast.compile()
gpu_src_arr = gpuarray.to_gpu(src_arr) gpu_src_arr = cp.asarray(src_arr)
gpu_dst_arr = gpuarray.to_gpu(dst_arr) gpu_dst_arr = cp.asarray(dst_arr)
kernel(src=gpu_src_arr, dst=gpu_dst_arr) kernel(src=gpu_src_arr, dst=gpu_dst_arr)
gpu_dst_arr.get(dst_arr) dst_arr = gpu_dst_arr.get()
reference = np.zeros_like(dst_arr) reference = np.zeros_like(dst_arr)
gl = np.max(np.abs(np.array(offset, dtype=int))) gl = np.max(np.abs(np.array(offset, dtype=int)))
...@@ -92,6 +107,7 @@ def test_multiple_index_dimensions(): ...@@ -92,6 +107,7 @@ def test_multiple_index_dimensions():
def test_ghost_layer(): def test_ghost_layer():
pytest.importorskip('cupy')
size = (6, 5) size = (6, 5)
src_arr = np.ones(size) src_arr = np.ones(size)
dst_arr = np.zeros_like(src_arr) dst_arr = np.zeros_like(src_arr)
...@@ -100,13 +116,15 @@ def test_ghost_layer(): ...@@ -100,13 +116,15 @@ def test_ghost_layer():
update_rule = Assignment(dst_field[0, 0], src_field[0, 0]) update_rule = Assignment(dst_field[0, 0], src_field[0, 0])
ghost_layers = [(1, 2), (2, 1)] ghost_layers = [(1, 2), (2, 1)]
ast = create_cuda_kernel([update_rule], ghost_layers=ghost_layers, indexing_creator=LineIndexing)
kernel = make_python_function(ast)
gpu_src_arr = gpuarray.to_gpu(src_arr) config = CreateKernelConfig(target=Target.GPU, ghost_layers=ghost_layers, gpu_indexing="line")
gpu_dst_arr = gpuarray.to_gpu(dst_arr) ast = create_kernel(sympy_cse_on_assignment_list([update_rule]), config=config)
kernel = ast.compile()
gpu_src_arr = cp.asarray(src_arr)
gpu_dst_arr = cp.asarray(dst_arr)
kernel(src=gpu_src_arr, dst=gpu_dst_arr) kernel(src=gpu_src_arr, dst=gpu_dst_arr)
gpu_dst_arr.get(dst_arr) dst_arr = gpu_dst_arr.get()
reference = np.zeros_like(src_arr) reference = np.zeros_like(src_arr)
reference[ghost_layers[0][0]:-ghost_layers[0][1], ghost_layers[1][0]:-ghost_layers[1][1]] = 1 reference[ghost_layers[0][0]:-ghost_layers[0][1], ghost_layers[1][0]:-ghost_layers[1][1]] = 1
...@@ -114,25 +132,29 @@ def test_ghost_layer(): ...@@ -114,25 +132,29 @@ def test_ghost_layer():
def test_setting_value(): def test_setting_value():
pytest.importorskip('cupy')
arr_cpu = np.arange(25, dtype=np.float64).reshape(5, 5) arr_cpu = np.arange(25, dtype=np.float64).reshape(5, 5)
arr_gpu = gpuarray.to_gpu(arr_cpu) arr_gpu = cp.asarray(arr_cpu)
iteration_slice = make_slice[:, :] iteration_slice = make_slice[:, :]
f = Field.create_generic("f", 2) f = Field.create_generic("f", 2)
update_rule = [Assignment(f(0), sp.Symbol("value"))] update_rule = [Assignment(f(0), sp.Symbol("value"))]
ast = create_cuda_kernel(update_rule, iteration_slice=iteration_slice, indexing_creator=LineIndexing)
kernel = make_python_function(ast) config = CreateKernelConfig(target=Target.GPU, gpu_indexing="line", iteration_slice=iteration_slice)
ast = create_kernel(sympy_cse_on_assignment_list(update_rule), config=config)
kernel = ast.compile()
kernel(f=arr_gpu, value=np.float64(42.0)) kernel(f=arr_gpu, value=np.float64(42.0))
np.testing.assert_equal(arr_gpu.get(), np.ones((5, 5)) * 42.0) np.testing.assert_equal(arr_gpu.get(), np.ones((5, 5)) * 42.0)
def test_periodicity(): def test_periodicity():
from pystencils.gpucuda.periodicity import get_periodic_boundary_functor as periodic_gpu pytest.importorskip('cupy')
from pystencils.gpu.periodicity import get_periodic_boundary_functor as periodic_gpu
from pystencils.slicing import get_periodic_boundary_functor as periodic_cpu from pystencils.slicing import get_periodic_boundary_functor as periodic_cpu
arr_cpu = np.arange(50, dtype=np.float64).reshape(5, 5, 2) arr_cpu = np.arange(50, dtype=np.float64).reshape(5, 5, 2)
arr_gpu = gpuarray.to_gpu(arr_cpu) arr_gpu = cp.asarray(arr_cpu)
periodicity_stencil = [(1, 0), (-1, 0), (1, 1)] periodicity_stencil = [(1, 0), (-1, 0), (1, 1)]
periodic_gpu_kernel = periodic_gpu(periodicity_stencil, (5, 5), 1, 2) periodic_gpu_kernel = periodic_gpu(periodicity_stencil, (5, 5), 1, 2)
...@@ -141,22 +163,95 @@ def test_periodicity(): ...@@ -141,22 +163,95 @@ def test_periodicity():
cpu_result = np.copy(arr_cpu) cpu_result = np.copy(arr_cpu)
periodic_cpu_kernel(cpu_result) periodic_cpu_kernel(cpu_result)
gpu_result = np.copy(arr_cpu)
periodic_gpu_kernel(pdfs=arr_gpu) periodic_gpu_kernel(pdfs=arr_gpu)
arr_gpu.get(gpu_result) gpu_result = arr_gpu.get()
np.testing.assert_equal(cpu_result, gpu_result) np.testing.assert_equal(cpu_result, gpu_result)
def test_block_size_limiting(): @pytest.mark.parametrize("device_number", device_numbers)
res = BlockIndexing.limit_block_size_to_device_maximum((4096, 4096, 4096)) def test_block_indexing(device_number):
assert all(r < 4096 for r in res) pytest.importorskip('cupy')
def test_block_indexing():
f = fields("f: [3D]") f = fields("f: [3D]")
bi = BlockIndexing(f, make_slice[:, :, :], block_size=(16, 8, 2), permute_block_size_dependent_on_layout=False) s = normalize_slice(make_slice[:, :, :], f.spatial_shape)
bi = BlockIndexing(s, f.layout, block_size=(16, 8, 2),
permute_block_size_dependent_on_layout=False)
assert bi.call_parameters((3, 2, 32))['block'] == (3, 2, 32) assert bi.call_parameters((3, 2, 32))['block'] == (3, 2, 32)
assert bi.call_parameters((32, 2, 32))['block'] == (16, 2, 8) assert bi.call_parameters((32, 2, 32))['block'] == (16, 2, 8)
bi = BlockIndexing(f, make_slice[:, :, :], block_size=(32, 1, 1), permute_block_size_dependent_on_layout=False) bi = BlockIndexing(s, f.layout, block_size=(32, 1, 1),
permute_block_size_dependent_on_layout=False)
assert bi.call_parameters((1, 16, 16))['block'] == (1, 16, 2) assert bi.call_parameters((1, 16, 16))['block'] == (1, 16, 2)
bi = BlockIndexing(s, f.layout, block_size=(16, 8, 2),
maximum_block_size="auto", device_number=device_number)
# This function should be used if number of needed registers is known. Can be determined with func.num_regs
registers_per_thread = 1000
blocks = bi.limit_block_size_by_register_restriction([1024, 1024, 1], registers_per_thread)
if cp.cuda.runtime.is_hip:
max_registers_per_block = cp.cuda.runtime.deviceGetAttribute(71, device_number)
else:
device = cp.cuda.Device(device_number)
da = device.attributes
max_registers_per_block = da.get("MaxRegistersPerBlock")
assert np.prod(blocks) * registers_per_thread < max_registers_per_block
@pytest.mark.parametrize('gpu_indexing', ("block", "line"))
@pytest.mark.parametrize('layout', ("C", "F"))
@pytest.mark.parametrize('shape', ((5, 5, 5, 5), (3, 17, 387, 4), (23, 44, 21, 11)))
def test_four_dimensional_kernel(gpu_indexing, layout, shape):
pytest.importorskip('cupy')
n_elements = np.prod(shape)
arr_cpu = np.arange(n_elements, dtype=np.float64).reshape(shape, order=layout)
arr_gpu = cp.asarray(arr_cpu)
iteration_slice = make_slice[:, :, :, :]
f = Field.create_from_numpy_array("f", arr_cpu)
update_rule = [Assignment(f.center, sp.Symbol("value"))]
config = CreateKernelConfig(target=Target.GPU, gpu_indexing=gpu_indexing, iteration_slice=iteration_slice)
ast = create_kernel(update_rule, config=config)
kernel = ast.compile()
kernel(f=arr_gpu, value=np.float64(42.0))
np.testing.assert_equal(arr_gpu.get(), np.ones(shape) * 42.0)
@pytest.mark.parametrize('start', (1, 5))
@pytest.mark.parametrize('end', (-1, -2, -3, -4))
@pytest.mark.parametrize('step', (1, 2, 3, 4))
@pytest.mark.parametrize('shape', ([55, 60], [77, 101, 80], [44, 64, 66]))
def test_guards_with_iteration_slices(start, end, step, shape):
iter_slice = tuple([slice(start, end, step)] * len(shape))
kernel_config_gpu = CreateKernelConfig(target=Target.GPU, iteration_slice=iter_slice)
field_1 = fields(f"f(1) : double{list(shape)}")
assignment = Assignment(field_1.center, 1)
ast = create_kernel(assignment, config=kernel_config_gpu)
code_str = get_code_str(ast)
test_strings = list()
iteration_ranges = list()
for i, s in enumerate(iter_slice):
e = ((shape[i] + end) - s.start) / s.step
e = math.ceil(e) + s.start
test_strings.append(f"{s.start} < {e}")
a = s.start
counter = 0
while a < e:
a += 1
counter += 1
iteration_ranges.append(counter)
# check if the expected if statement is in the GPU code
for s in test_strings:
assert s in code_str
# check if these bounds lead to same lengths as the range function would produce
for i in range(len(iter_slice)):
assert iteration_ranges[i] == len(range(iter_slice[i].start, shape[i] + end, iter_slice[i].step))
import pytest
import platform
import numpy as np
import pystencils as ps
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
def test_half_precison(target):
if target == ps.Target.CPU:
if not platform.machine() in ['arm64', 'aarch64']:
pytest.xfail("skipping half precision test on non arm platform")
if 'clang' not in ps.cpu.cpujit.get_compiler_config()['command']:
pytest.xfail("skipping half precision because clang compiler is not used")
if target == ps.Target.GPU:
pytest.importorskip("cupy")
dh = ps.create_data_handling(domain_size=(10, 10), default_target=target)
f1 = dh.add_array("f1", values_per_cell=1, dtype=np.float16)
dh.fill("f1", 1.0, ghost_layers=True)
f2 = dh.add_array("f2", values_per_cell=1, dtype=np.float16)
dh.fill("f2", 2.0, ghost_layers=True)
f3 = dh.add_array("f3", values_per_cell=1, dtype=np.float16)
dh.fill("f3", 0.0, ghost_layers=True)
up = ps.Assignment(f3.center, f1.center + 2.1 * f2.center)
config = ps.CreateKernelConfig(target=dh.default_target, default_number_float=np.float32)
ast = ps.create_kernel(up, config=config)
kernel = ast.compile()
dh.run_kernel(kernel)
dh.all_to_cpu()
assert np.all(dh.cpu_arrays[f3.name] == 5.2)
assert dh.cpu_arrays[f3.name].dtype == np.float16
"""
"""
import pytest
from pystencils.astnodes import Block
from pystencils.backends.cbackend import CustomCodeNode, get_headers
def test_headers_have_quotes_or_brackets():
class ErrorNode1(CustomCodeNode):
def __init__(self):
super().__init__("", [], [])
self.headers = ["iostream"]
class ErrorNode2(CustomCodeNode):
headers = ["<iostream>", "foo"]
def __init__(self):
super().__init__("", [], [])
self.headers = ["<iostream>", "foo"]
class OkNode3(CustomCodeNode):
def __init__(self):
super().__init__("", [], [])
self.headers = ["<iostream>", '"foo"']
with pytest.raises(AssertionError, match='.* does not follow the pattern .*'):
get_headers(Block([ErrorNode1()]))
with pytest.raises(AssertionError, match='.* does not follow the pattern .*'):
get_headers(ErrorNode2())
get_headers(OkNode3())
import sympy as sp
import numpy as np
import pytest
import pystencils as ps
from pystencils import Assignment, Field, CreateKernelConfig, create_kernel, Target
from pystencils.transformations import filtered_tree_iteration
from pystencils.typing import BasicType, FieldPointerSymbol, PointerType, TypedSymbol
@pytest.mark.parametrize('target', [ps.Target.CPU, ps.Target.GPU])
def test_indexed_kernel(target):
if target == Target.GPU:
pytest.importorskip("cupy")
import cupy as cp
arr = np.zeros((3, 4))
dtype = np.dtype([('x', int), ('y', int), ('value', arr.dtype)])
index_arr = np.zeros((3,), dtype=dtype)
index_arr[0] = (0, 2, 3.0)
index_arr[1] = (1, 3, 42.0)
index_arr[2] = (2, 1, 5.0)
indexed_field = Field.create_from_numpy_array('index', index_arr)
normal_field = Field.create_from_numpy_array('f', arr)
update_rule = Assignment(normal_field[0, 0], indexed_field('value'))
config = CreateKernelConfig(target=target, index_fields=[indexed_field])
ast = create_kernel([update_rule], config=config)
kernel = ast.compile()
if target == Target.CPU:
kernel(f=arr, index=index_arr)
else:
gpu_arr = cp.asarray(arr)
gpu_index_arr = cp.ndarray(index_arr.shape, dtype=index_arr.dtype)
gpu_index_arr.set(index_arr)
kernel(f=gpu_arr, index=gpu_index_arr)
arr = gpu_arr.get()
for i in range(index_arr.shape[0]):
np.testing.assert_allclose(arr[index_arr[i]['x'], index_arr[i]['y']], index_arr[i]['value'], atol=1e-13)
@pytest.mark.parametrize('index_size', ("fixed", "variable"))
@pytest.mark.parametrize('array_size', ("3D", "2D", "10, 12", "13, 17, 19"))
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
@pytest.mark.parametrize('dtype', ("float64", "float32"))
def test_indexed_domain_kernel(index_size, array_size, target, dtype):
dtype = BasicType(dtype)
f = ps.fields(f'f(1): {dtype.numpy_dtype.name}[{array_size}]')
g = ps.fields(f'g(1): {dtype.numpy_dtype.name}[{array_size}]')
index = TypedSymbol("index", dtype=BasicType(np.int16))
if index_size == "variable":
index_src = TypedSymbol("_size_src", dtype=BasicType(np.int16))
index_dst = TypedSymbol("_size_dst", dtype=BasicType(np.int16))
else:
index_src = 16
index_dst = 16
pointer_type = PointerType(dtype, const=False, restrict=True, double_pointer=True)
const_pointer_type = PointerType(dtype, const=True, restrict=True, double_pointer=True)
src = sp.IndexedBase(TypedSymbol(f"_data_{f.name}", dtype=const_pointer_type), shape=index_src)
dst = sp.IndexedBase(TypedSymbol(f"_data_{g.name}", dtype=pointer_type), shape=index_dst)
update_rule = [ps.Assignment(FieldPointerSymbol("f", dtype, const=True), src[index + 1]),
ps.Assignment(FieldPointerSymbol("g", dtype, const=False), dst[index + 1]),
ps.Assignment(g.center, f.center)]
ast = ps.create_kernel(update_rule, target=target)
code = ps.get_code_str(ast)
assert f"const {dtype.c_name} * RESTRICT _data_f = (({dtype.c_name} * RESTRICT const)(_data_f[index + 1]));" in code
assert f"{dtype.c_name} * RESTRICT _data_g = (({dtype.c_name} * RESTRICT )(_data_g[index + 1]));" in code
if target == Target.CPU:
assert code.count("for") == f.spatial_dimensions + 1
import numpy as np import numpy as np
from pystencils import show_code
from pystencils.transformations import move_constants_before_loop, make_loop_over_domain, resolve_field_accesses from pystencils import get_code_obj
from pystencils.field import Field from pystencils.astnodes import Block, KernelFunction, SympyAssignment
from pystencils.astnodes import SympyAssignment, Block
from pystencils.cpu import make_python_function from pystencils.cpu import make_python_function
from pystencils.field import Field
from pystencils.enums import Target, Backend
from pystencils.transformations import (
make_loop_over_domain, move_constants_before_loop, resolve_field_accesses)
def test_jacobi_fixed_field_size(): def test_jacobi_fixed_field_size():
...@@ -19,7 +22,8 @@ def test_jacobi_fixed_field_size(): ...@@ -19,7 +22,8 @@ def test_jacobi_fixed_field_size():
jacobi = SympyAssignment(d[0, 0], (f[1, 0] + f[-1, 0] + f[0, 1] + f[0, -1]) / 4) jacobi = SympyAssignment(d[0, 0], (f[1, 0] + f[-1, 0] + f[0, 1] + f[0, -1]) / 4)
body = Block([jacobi]) body = Block([jacobi])
ast_node = make_loop_over_domain(body, "kernel") loop_node, gl_info = make_loop_over_domain(body)
ast_node = KernelFunction(loop_node, Target.CPU, Backend.C, make_python_function, ghost_layers=gl_info)
resolve_field_accesses(ast_node) resolve_field_accesses(ast_node)
move_constants_before_loop(ast_node) move_constants_before_loop(ast_node)
...@@ -28,12 +32,12 @@ def test_jacobi_fixed_field_size(): ...@@ -28,12 +32,12 @@ def test_jacobi_fixed_field_size():
dst_field_py[x, y] = 0.25 * (src_field_py[x - 1, y] + src_field_py[x + 1, y] + dst_field_py[x, y] = 0.25 * (src_field_py[x - 1, y] + src_field_py[x + 1, y] +
src_field_py[x, y - 1] + src_field_py[x, y + 1]) src_field_py[x, y - 1] + src_field_py[x, y + 1])
kernel = make_python_function(ast_node) kernel = ast_node.compile()
kernel(f=src_field_c, d=dst_field_c) kernel(f=src_field_c, d=dst_field_c)
error = np.sum(np.abs(dst_field_py - dst_field_c)) error = np.sum(np.abs(dst_field_py - dst_field_c))
np.testing.assert_allclose(error, 0.0, atol=1e-13) np.testing.assert_allclose(error, 0.0, atol=1e-13)
code_display = show_code(ast_node) code_display = get_code_obj(ast_node)
assert 'for' in str(code_display) assert 'for' in str(code_display)
assert 'for' in code_display._repr_html_() assert 'for' in code_display._repr_html_()
...@@ -44,7 +48,8 @@ def test_jacobi_variable_field_size(): ...@@ -44,7 +48,8 @@ def test_jacobi_variable_field_size():
d = Field.create_generic("d", 3) d = Field.create_generic("d", 3)
jacobi = SympyAssignment(d[0, 0, 0], (f[1, 0, 0] + f[-1, 0, 0] + f[0, 1, 0] + f[0, -1, 0]) / 4) jacobi = SympyAssignment(d[0, 0, 0], (f[1, 0, 0] + f[-1, 0, 0] + f[0, 1, 0] + f[0, -1, 0]) / 4)
body = Block([jacobi]) body = Block([jacobi])
ast_node = make_loop_over_domain(body, "kernel") loop_node, gl_info = make_loop_over_domain(body)
ast_node = KernelFunction(loop_node, Target.CPU, Backend.C, make_python_function, ghost_layers=gl_info)
resolve_field_accesses(ast_node) resolve_field_accesses(ast_node)
move_constants_before_loop(ast_node) move_constants_before_loop(ast_node)
...@@ -53,13 +58,13 @@ def test_jacobi_variable_field_size(): ...@@ -53,13 +58,13 @@ def test_jacobi_variable_field_size():
dst_field_c = np.zeros(size) dst_field_c = np.zeros(size)
dst_field_py = np.zeros(size) dst_field_py = np.zeros(size)
for x in range(1, size[0]-1): for x in range(1, size[0] - 1):
for y in range(1, size[1]-1): for y in range(1, size[1] - 1):
for z in range(1, size[2]-1): for z in range(1, size[2] - 1):
dst_field_py[x, y, z] = 0.25 * (src_field_py[x - 1, y, z] + src_field_py[x + 1, y, z] + dst_field_py[x, y, z] = 0.25 * (src_field_py[x - 1, y, z] + src_field_py[x + 1, y, z] +
src_field_py[x, y - 1, z] + src_field_py[x, y + 1, z]) src_field_py[x, y - 1, z] + src_field_py[x, y + 1, z])
kernel = make_python_function(ast_node) kernel = ast_node.compile()
kernel(f=src_field_c, d=dst_field_c) kernel(f=src_field_c, d=dst_field_c)
error = np.sum(np.abs(dst_field_py-dst_field_c)) error = np.sum(np.abs(dst_field_py - dst_field_c))
np.testing.assert_allclose(error, 0.0, atol=1e-13) np.testing.assert_allclose(error, 0.0, atol=1e-13)
# -*- coding: utf-8 -*-
#
# Copyright © 2019 Stephan Seitz <stephan.seitz@fau.de>
#
# Distributed under terms of the GPLv3 license.
"""
"""
import sympy
import pystencils
from pystencils.backends.json import print_json, print_yaml, write_json, write_yaml
import tempfile
def test_json_backend():
z, y, x = pystencils.fields("z, y, x: [20,40]")
a = sympy.Symbol('a')
assignments = pystencils.AssignmentCollection({
z[0, 0]: x[0, 0] * a * x[0, 0] * y[0, 0]
})
ast = pystencils.create_kernel(assignments)
pj = print_json(ast)
# print(pj)
py = print_yaml(ast)
# print(py)
temp_dir = tempfile.TemporaryDirectory()
write_json(temp_dir.name + '/test.json', ast)
write_yaml(temp_dir.name + '/test.yaml', ast)
"""
Test the pystencils-specific JSON encoder and serializer as used in the Database class.
"""
import numpy as np
import tempfile
from pystencils.config import CreateKernelConfig
from pystencils import Target, Field
from pystencils.runhelper.db import Database, PystencilsJsonSerializer
def test_json_serializer():
dtype = np.float32
index_arr = np.zeros((3,), dtype=dtype)
indexed_field = Field.create_from_numpy_array('index', index_arr)
# create pystencils config
config = CreateKernelConfig(target=Target.CPU, function_name='dummy_config', data_type=dtype,
index_fields=[indexed_field])
# create dummy database
temp_dir = tempfile.TemporaryDirectory()
db = Database(file=temp_dir.name, serializer_info=('pystencils_serializer', PystencilsJsonSerializer))
db.save(params={'config': config}, result={'test': 'dummy'})
%% Cell type:code id: tags:
``` python
from pystencils.session import *
```
%% Cell type:code id: tags:
``` python
dh = ps.create_data_handling(domain_size=(256, 256), periodicity=True)
c_field = dh.add_array('c')
dh.fill("c", 0.0, ghost_layers=True)
```
%% Cell type:code id: tags:
``` python
for x in range(129):
for y in range(258):
dh.cpu_arrays['c'][x, y] = 1.0
```
%% Cell type:code id: tags:
``` python
plt.scalar_field(dh.cpu_arrays["c"])
```
%% Output
<matplotlib.image.AxesImage at 0x117081c10>
%% Cell type:code id: tags:
``` python
ur = ps.Assignment(c_field[0, 0], c_field[1, 0])
config = ps.CreateKernelConfig(target=dh.default_target, cpu_openmp=False, skip_independence_check=True)
ast = ps.create_kernel(ur, config=config)
kernel = ast.compile()
```
%% Cell type:code id: tags:
``` python
c_sync = dh.synchronization_function_cpu(['c'])
```
%% Cell type:code id: tags:
``` python
def timeloop(steps=10):
for i in range(steps):
c_sync()
dh.run_kernel(kernel)
return dh.gather_array('c')
```
%% Cell type:code id: tags:
``` python
ps.jupyter.set_display_mode('video')
```
%% Cell type:code id: tags:
``` python
ani = ps.plot.scalar_field_animation(timeloop, rescale=True, frames=12)
ps.jupyter.display_animation(ani)
```
%% Output
<IPython.core.display.HTML object>
%% Cell type:code id: tags:
``` python
ps.jupyter.set_display_mode('image_update')
```
%% Cell type:code id: tags:
``` python
ani = ps.plot.scalar_field_animation(timeloop, rescale=True, frames=12)
ps.jupyter.display_animation(ani)
```
%% Output
%% Cell type:code id: tags:
``` python
def grid_update_function(image):
for i in range(40):
c_sync()
dh.run_kernel(kernel)
return dh.gather_array('c')
```
%% Cell type:code id: tags:
``` python
animation = ps.jupyter.make_imshow_animation(dh.cpu_arrays["c"], grid_update_function, frames=300)
```
%% Output
%% Cell type:code id: tags:
``` python
ps.jupyter.set_display_mode("video")
ps.jupyter.set_display_mode("window")
ps.jupyter.set_display_mode("image_update")
ps.jupyter.activate_ipython()
```
%% Output
---------------------------------------------------------------------------
ImportError Traceback (most recent call last)
Cell In[14], line 2
1 ps.jupyter.set_display_mode("video")
----> 2 ps.jupyter.set_display_mode("window")
3 ps.jupyter.set_display_mode("image_update")
4 ps.jupyter.activate_ipython()
File ~/pystencils/pystencils/src/pystencils/jupyter.py:115, in set_display_mode(mode)
113 display_animation_func = display_as_html_video
114 elif animation_display_mode == 'window':
--> 115 ipython.magic("matplotlib qt")
116 display_animation_func = display_in_extra_window
117 elif animation_display_mode == 'image_update':
File /opt/local/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/IPython/core/interactiveshell.py:2539, in InteractiveShell.magic(self, arg_s)
2537 magic_name, _, magic_arg_s = arg_s.partition(' ')
2538 magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2539 return self.run_line_magic(magic_name, magic_arg_s, _stack_depth=2)
File /opt/local/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/IPython/core/interactiveshell.py:2417, in InteractiveShell.run_line_magic(self, magic_name, line, _stack_depth)
2415 kwargs['local_ns'] = self.get_local_scope(stack_depth)
2416 with self.builtin_trap:
-> 2417 result = fn(*args, **kwargs)
2419 # The code below prevents the output from being displayed
2420 # when using magics with decodator @output_can_be_silenced
2421 # when the last Python token in the expression is a ';'.
2422 if getattr(fn, magic.MAGIC_OUTPUT_CAN_BE_SILENCED, False):
File /opt/local/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/IPython/core/magics/pylab.py:99, in PylabMagics.matplotlib(self, line)
97 print("Available matplotlib backends: %s" % backends_list)
98 else:
---> 99 gui, backend = self.shell.enable_matplotlib(args.gui.lower() if isinstance(args.gui, str) else args.gui)
100 self._show_matplotlib_backend(args.gui, backend)
File /opt/local/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/IPython/core/interactiveshell.py:3603, in InteractiveShell.enable_matplotlib(self, gui)
3599 print('Warning: Cannot change to a different GUI toolkit: %s.'
3600 ' Using %s instead.' % (gui, self.pylab_gui_select))
3601 gui, backend = pt.find_gui_and_backend(self.pylab_gui_select)
-> 3603 pt.activate_matplotlib(backend)
3604 configure_inline_support(self, backend)
3606 # Now we must activate the gui pylab wants to use, and fix %run to take
3607 # plot updates into account
File /opt/local/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/IPython/core/pylabtools.py:360, in activate_matplotlib(backend)
355 # Due to circular imports, pyplot may be only partially initialised
356 # when this function runs.
357 # So avoid needing matplotlib attribute-lookup to access pyplot.
358 from matplotlib import pyplot as plt
--> 360 plt.switch_backend(backend)
362 plt.show._needmain = False
363 # We need to detect at runtime whether show() is called by the user.
364 # For this, we wrap it into a decorator which adds a 'called' flag.
File /opt/local/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/matplotlib/pyplot.py:271, in switch_backend(newbackend)
268 # have to escape the switch on access logic
269 old_backend = dict.__getitem__(rcParams, 'backend')
--> 271 backend_mod = importlib.import_module(
272 cbook._backend_module_name(newbackend))
274 required_framework = _get_required_interactive_framework(backend_mod)
275 if required_framework is not None:
File /opt/local/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/importlib/__init__.py:126, in import_module(name, package)
124 break
125 level += 1
--> 126 return _bootstrap._gcd_import(name[level:], package, level)
File <frozen importlib._bootstrap>:1204, in _gcd_import(name, package, level)
File <frozen importlib._bootstrap>:1176, in _find_and_load(name, import_)
File <frozen importlib._bootstrap>:1147, in _find_and_load_unlocked(name, import_)
File <frozen importlib._bootstrap>:690, in _load_unlocked(spec)
File <frozen importlib._bootstrap_external>:940, in exec_module(self, module)
File <frozen importlib._bootstrap>:241, in _call_with_frames_removed(f, *args, **kwds)
File /opt/local/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/matplotlib/backends/backend_qt5agg.py:7
4 from .. import backends
6 backends._QT_FORCE_QT5_BINDING = True
----> 7 from .backend_qtagg import ( # noqa: F401, E402 # pylint: disable=W0611
8 _BackendQTAgg, FigureCanvasQTAgg, FigureManagerQT, NavigationToolbar2QT,
9 FigureCanvasAgg, FigureCanvasQT)
12 @_BackendQTAgg.export
13 class _BackendQT5Agg(_BackendQTAgg):
14 pass
File /opt/local/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/matplotlib/backends/backend_qtagg.py:9
5 import ctypes
7 from matplotlib.transforms import Bbox
----> 9 from .qt_compat import QT_API, _enum
10 from .backend_agg import FigureCanvasAgg
11 from .backend_qt import QtCore, QtGui, _BackendQT, FigureCanvasQT
File /opt/local/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/matplotlib/backends/qt_compat.py:135
133 break
134 else:
--> 135 raise ImportError(
136 "Failed to import any of the following Qt binding modules: {}"
137 .format(", ".join(_ETS.values())))
138 else: # We should not get there.
139 raise AssertionError(f"Unexpected QT_API: {QT_API}")
ImportError: Failed to import any of the following Qt binding modules: PyQt6, PySide6, PyQt5, PySide2
import pytest
import numpy as np
import sympy as sp
import pystencils as ps
@pytest.mark.parametrize('dtype', ["float64", "float32"])
def test_log(dtype):
a = sp.Symbol("a")
x = ps.fields(f'x: {dtype}[1d]')
assignments = ps.AssignmentCollection({x.center(): sp.log(a)})
ast = ps.create_kernel(assignments)
code = ps.get_code_str(ast)
kernel = ast.compile()
# ps.show_code(ast)
if dtype == "float64":
assert "float" not in code
array = np.zeros((10,), dtype=dtype)
kernel(x=array, a=100)
assert np.allclose(array, 4.60517019)
import sympy as sp
import numpy as np import numpy as np
import sympy as sp
import pytest
import pystencils as ps import pystencils as ps
from pystencils import Field import pystencils.astnodes as ast
from pystencils.cpu import create_kernel, make_python_function from pystencils.field import Field, FieldType
from pystencils.astnodes import Conditional, LoopOverCoordinate, SympyAssignment from pystencils.astnodes import Conditional, LoopOverCoordinate, SympyAssignment
from pystencils.cpu import create_kernel, make_python_function
from pystencils.kernelcreation import create_staggered_kernel from pystencils.kernelcreation import create_staggered_kernel
from pystencils.transformations import move_constants_before_loop from pystencils.transformations import (
import pystencils.astnodes as ast cleanup_blocks, cut_loop, move_constants_before_loop, simplify_conditionals)
from pystencils.transformations import simplify_conditionals, cleanup_blocks, cut_loop
def offsets_in_plane(normal_plane, offset_int, dimension): def offsets_in_plane(normal_plane, offset_int, dimension):
...@@ -33,9 +36,9 @@ def test_staggered_iteration(): ...@@ -33,9 +36,9 @@ def test_staggered_iteration():
s_arr_ref = s_arr.copy() s_arr_ref = s_arr.copy()
fields_fixed = (Field.create_from_numpy_array('f', f_arr), fields_fixed = (Field.create_from_numpy_array('f', f_arr),
Field.create_from_numpy_array('s', s_arr, index_dimensions=1)) Field.create_from_numpy_array('s', s_arr, index_dimensions=1, field_type=FieldType.STAGGERED))
fields_var = (Field.create_generic('f', 2), fields_var = (Field.create_generic('f', 2),
Field.create_generic('s', 2, index_dimensions=1)) Field.create_generic('s', 2, index_dimensions=1, index_shape=(dim,), field_type=FieldType.STAGGERED))
for f, s in [fields_var, fields_fixed]: for f, s in [fields_var, fields_fixed]:
# --- Manual # --- Manual
...@@ -47,14 +50,18 @@ def test_staggered_iteration(): ...@@ -47,14 +50,18 @@ def test_staggered_iteration():
sum(f[o] for o in offsets_in_plane(d, -1, dim))) sum(f[o] for o in offsets_in_plane(d, -1, dim)))
cond = sp.And(*[conditions[i] for i in range(dim) if d != i]) cond = sp.And(*[conditions[i] for i in range(dim) if d != i])
eqs.append(Conditional(cond, eq)) eqs.append(Conditional(cond, eq))
func = create_kernel(eqs, ghost_layers=[(1, 0), (1, 0), (1, 0)]).compile() # TODO: correct type hint
config = ps.CreateKernelConfig(target=ps.Target.CPU, ghost_layers=[(1, 0), (1, 0), (1, 0)])
func = ps.create_kernel(eqs, config=config).compile()
# --- Built-in optimized # --- Built-in optimized
expressions = [] expressions = []
for d in range(dim): for d in range(dim):
expressions.append(sum(f[o] for o in offsets_in_plane(d, 0, dim)) - expressions.append(sum(f[o] for o in offsets_in_plane(d, 0, dim)) -
sum(f[o] for o in offsets_in_plane(d, -1, dim))) sum(f[o] for o in offsets_in_plane(d, -1, dim)))
func_optimized = create_staggered_kernel(s, expressions).compile() assignments = [ps.Assignment(s.staggered_access(d), expressions[i]) for i, d in enumerate(s.staggered_stencil)]
func_optimized = create_staggered_kernel(assignments).compile()
pytest.importorskip('islpy')
assert not func_optimized.ast.atoms(Conditional), "Loop cutting optimization did not work" assert not func_optimized.ast.atoms(Conditional), "Loop cutting optimization did not work"
func(f=f_arr, s=s_arr_ref) func(f=f_arr, s=s_arr_ref)
...@@ -69,12 +76,13 @@ def test_staggered_iteration_manual(): ...@@ -69,12 +76,13 @@ def test_staggered_iteration_manual():
s_arr_ref = s_arr.copy() s_arr_ref = s_arr.copy()
f = Field.create_from_numpy_array('f', f_arr) f = Field.create_from_numpy_array('f', f_arr)
s = Field.create_from_numpy_array('s', s_arr, index_dimensions=1) s = Field.create_from_numpy_array('s', s_arr, index_dimensions=1, field_type=FieldType.STAGGERED)
eqs = [] eqs = []
counters = [LoopOverCoordinate.get_loop_counter_symbol(i) for i in range(dim)] counters = [LoopOverCoordinate.get_loop_counter_symbol(i) for i in range(dim)]
conditions = [counters[i] < f.shape[i] - 1 for i in range(dim)] conditions = [counters[i] < f.shape[i] - 1 for i in range(dim)]
conditions2 = counters[0] > f.shape[0] + 5
for d in range(dim): for d in range(dim):
eq = SympyAssignment(s(d), sum(f[o] for o in offsets_in_plane(d, 0, dim)) - eq = SympyAssignment(s(d), sum(f[o] for o in offsets_in_plane(d, 0, dim)) -
...@@ -82,7 +90,13 @@ def test_staggered_iteration_manual(): ...@@ -82,7 +90,13 @@ def test_staggered_iteration_manual():
cond = sp.And(*[conditions[i] for i in range(dim) if d != i]) cond = sp.And(*[conditions[i] for i in range(dim) if d != i])
eqs.append(Conditional(cond, eq)) eqs.append(Conditional(cond, eq))
kernel_ast = create_kernel(eqs, ghost_layers=[(1, 0), (1, 0), (1, 0)]) # this conditional should vanish entirely because it is never true
eq = SympyAssignment(s(0), f[0, 0])
cond = sp.And(*[conditions2])
eqs.append(Conditional(cond, eq))
config = ps.CreateKernelConfig(target=ps.Target.CPU, ghost_layers=[(1, 0), (1, 0), (1, 0)])
kernel_ast = ps.create_kernel(eqs, config=config)
func = make_python_function(kernel_ast) func = make_python_function(kernel_ast)
func(f=f_arr, s=s_arr_ref) func(f=f_arr, s=s_arr_ref)
...@@ -97,6 +111,7 @@ def test_staggered_iteration_manual(): ...@@ -97,6 +111,7 @@ def test_staggered_iteration_manual():
move_constants_before_loop(kernel_ast.body) move_constants_before_loop(kernel_ast.body)
cleanup_blocks(kernel_ast.body) cleanup_blocks(kernel_ast.body)
pytest.importorskip('islpy')
assert not kernel_ast.atoms(Conditional), "Loop cutting optimization did not work" assert not kernel_ast.atoms(Conditional), "Loop cutting optimization did not work"
func_optimized = make_python_function(kernel_ast) func_optimized = make_python_function(kernel_ast)
...@@ -106,11 +121,14 @@ def test_staggered_iteration_manual(): ...@@ -106,11 +121,14 @@ def test_staggered_iteration_manual():
def test_staggered_gpu(): def test_staggered_gpu():
dim = 2 dim = 2
f, s = ps.fields("f, s({dim}): double[{dim}D]".format(dim=dim)) f = ps.fields(f"f: double[{dim}D]")
s = ps.fields("s({dim}): double[{dim}D]".format(dim=dim), field_type=FieldType.STAGGERED)
expressions = [(f[0, 0] + f[-1, 0]) / 2, expressions = [(f[0, 0] + f[-1, 0]) / 2,
(f[0, 0] + f[0, -1]) / 2] (f[0, 0] + f[0, -1]) / 2]
kernel_ast = ps.create_staggered_kernel(s, expressions, target='gpu', gpu_exclusive_conditions=True) assignments = [ps.Assignment(s.staggered_access(d), expressions[i]) for i, d in enumerate(s.staggered_stencil)]
kernel_ast = ps.create_staggered_kernel(assignments, target=ps.Target.GPU, gpu_exclusive_conditions=True)
assert len(kernel_ast.atoms(Conditional)) == 4 assert len(kernel_ast.atoms(Conditional)) == 4
kernel_ast = ps.create_staggered_kernel(s, expressions, target='gpu', gpu_exclusive_conditions=False) assignments = [ps.Assignment(s.staggered_access(d), expressions[i]) for i, d in enumerate(s.staggered_stencil)]
kernel_ast = ps.create_staggered_kernel(assignments, target=ps.Target.GPU, gpu_exclusive_conditions=False)
assert len(kernel_ast.atoms(Conditional)) == 3 assert len(kernel_ast.atoms(Conditional)) == 3
# -*- coding: utf-8 -*-
#
# Copyright © 2019 Stephan Seitz <stephan.seitz@fau.de>
#
# Distributed under terms of the GPLv3 license.
"""
"""
import sympy as sp
import pystencils
from pystencils.typing import TypedSymbol, BasicType
def test_wild_typed_symbol():
x = pystencils.fields('x: float32[3d]')
typed_symbol = TypedSymbol('a', BasicType('float64'))
assert x.center().match(sp.Wild('w1'))
assert typed_symbol.match(sp.Wild('w1'))
wild_ceiling = sp.ceiling(sp.Wild('w1'))
assert sp.ceiling(x.center()).match(wild_ceiling)
assert sp.ceiling(typed_symbol).match(wild_ceiling)
def test_replace_and_subs_for_assignment_collection():
x, y = pystencils.fields('x, y: float32[3d]')
a, b, c, d = sp.symbols('a, b, c, d')
assignments = pystencils.AssignmentCollection({
a: sp.floor(1),
b: 2,
c: a + c,
y.center(): sp.ceiling(x.center()) + sp.floor(x.center())
})
expected_assignments = pystencils.AssignmentCollection({
a: sp.floor(3),
b: 2,
c: a + c,
y.center(): sp.ceiling(x.center()) + sp.floor(x.center())
})
assert expected_assignments == assignments.replace(1, 3)
assert expected_assignments == assignments.subs({1: 3})
expected_assignments = pystencils.AssignmentCollection({
d: sp.floor(1),
b: 2,
c: d + c,
y.center(): sp.ceiling(x.center()) + sp.floor(x.center())
})
print(expected_assignments)
print(assignments.subs(a, d))
assert expected_assignments == assignments.subs(a, d)
def test_match_for_assignment_collection():
x, y = pystencils.fields('x, y: float32[3d]')
a, b, c, d = sp.symbols('a, b, c, d')
assignments = pystencils.AssignmentCollection({
a: sp.floor(1),
b: 2,
c: a + c,
y.center(): sp.ceiling(x.center()) + sp.floor(x.center())
})
w1 = sp.Wild('w1')
w2 = sp.Wild('w2')
w3 = sp.Wild('w3')
wild_ceiling = sp.ceiling(w1)
wild_addition = w1 + w2
assert assignments.match(pystencils.Assignment(w3, wild_ceiling + w2))[w1] == x.center()
assert assignments.match(pystencils.Assignment(w3, wild_ceiling + w2)) == {
w3: y.center(),
w2: sp.floor(x.center()),
w1: x.center()
}
assert assignments.find(wild_ceiling) == {sp.ceiling(x.center())}
assert len([a for a in assignments.find(wild_addition) if isinstance(a, sp.Add)]) == 2
import pytest
import sympy as sp
import numpy as np
import pystencils as ps
from pystencils.fast_approximation import fast_division
@pytest.mark.parametrize('dtype', ["float64", "float32"])
@pytest.mark.parametrize('func', [sp.Pow, sp.atan2])
@pytest.mark.parametrize('target', [ps.Target.CPU, ps.Target.GPU])
def test_two_arguments(dtype, func, target):
if target == ps.Target.GPU:
pytest.importorskip("cupy")
dh = ps.create_data_handling(domain_size=(10, 10), periodicity=True, default_target=target)
x = dh.add_array('x', values_per_cell=1, dtype=dtype)
dh.fill("x", 0.0, ghost_layers=True)
y = dh.add_array('y', values_per_cell=1, dtype=dtype)
dh.fill("y", 1.0, ghost_layers=True)
z = dh.add_array('z', values_per_cell=1, dtype=dtype)
dh.fill("z", 2.0, ghost_layers=True)
config = ps.CreateKernelConfig(target=target)
# test sp.Max with one argument
up = ps.Assignment(x.center, func(y.center, z.center))
ast = ps.create_kernel(up, config=config)
code = ps.get_code_str(ast)
if dtype == 'float32':
assert func.__name__.lower() in code
kernel = ast.compile()
dh.all_to_gpu()
dh.run_kernel(kernel)
dh.all_to_cpu()
np.testing.assert_allclose(dh.gather_array("x")[0, 0], float(func(1.0, 2.0).evalf()),
13 if dtype == 'float64' else 5)
@pytest.mark.parametrize('dtype', ["float64", "float32"])
@pytest.mark.parametrize('func', [sp.sin, sp.cos, sp.sinh, sp.cosh, sp.atan, sp.floor, sp.ceiling])
@pytest.mark.parametrize('target', [ps.Target.CPU, ps.Target.GPU])
def test_single_arguments(dtype, func, target):
if target == ps.Target.GPU:
pytest.importorskip("cupy")
dh = ps.create_data_handling(domain_size=(10, 10), periodicity=True, default_target=target)
x = dh.add_array('x', values_per_cell=1, dtype=dtype)
dh.fill("x", 0.0, ghost_layers=True)
y = dh.add_array('y', values_per_cell=1, dtype=dtype)
dh.fill("y", 1.0, ghost_layers=True)
config = ps.CreateKernelConfig(target=target)
# test sp.Max with one argument
up = ps.Assignment(x.center, func(y.center))
ast = ps.create_kernel(up, config=config)
code = ps.get_code_str(ast)
if dtype == 'float32':
func_name = func.__name__.lower() if func is not sp.ceiling else "ceil"
assert func_name in code
kernel = ast.compile()
dh.all_to_gpu()
dh.run_kernel(kernel)
dh.all_to_cpu()
np.testing.assert_allclose(dh.gather_array("x")[0, 0], float(func(1.0).evalf()),
rtol=10**-3 if dtype == 'float32' else 10**-5)
@pytest.mark.parametrize('a', [sp.Symbol('a'), ps.fields('a: float64[2d]').center])
def test_avoid_pow(a):
x = ps.fields('x: float64[2d]')
up = ps.Assignment(x.center_vector[0], 2 * a ** 2 / 3)
ast = ps.create_kernel(up)
code = ps.get_code_str(ast)
assert "pow" not in code
def test_avoid_pow_fast_div():
x = ps.fields('x: float64[2d]')
a = ps.fields('a: float64[2d]').center
up = ps.Assignment(x.center_vector[0], fast_division(1, (a**2)))
ast = ps.create_kernel(up, config=ps.CreateKernelConfig(target=ps.Target.GPU))
# ps.show_code(ast)
code = ps.get_code_str(ast)
assert "pow" not in code
def test_avoid_pow_move_constants():
# At the end of the kernel creation the function move_constants_before_loop will be called
# This function additionally contains substitutions for symbols with the same value
# Thus it simplifies the equations again
x = ps.fields('x: float64[2d]')
a, b, c = sp.symbols("a, b, c")
up = [ps.Assignment(a, 0.0),
ps.Assignment(b, 0.0),
ps.Assignment(c, 0.0),
ps.Assignment(x.center_vector[0], a**2/18 - a*b/6 - a/18 + b**2/18 + b/18 - c**2/36)]
ast = ps.create_kernel(up)
code = ps.get_code_str(ast)
ps.show_code(ast)
assert "pow" not in code
import pytest
import numpy as np
import sympy as sp
import pystencils as ps
from pystencils.astnodes import LoopOverCoordinate, Conditional, Block, SympyAssignment
SLICE_LIST = [False,
ps.make_slice[1:-1:2, 1:-1:2],
ps.make_slice[2:-1:2, 4:-1:7],
ps.make_slice[4:-1:2, 5:-1:2],
ps.make_slice[3:-1:4, 7:-1:3]]
@pytest.mark.parametrize('target', [ps.Target.CPU, ps.Target.GPU])
@pytest.mark.parametrize('iteration_slice', SLICE_LIST)
def test_mod(target, iteration_slice):
if target == ps.Target.GPU:
pytest.importorskip("cupy")
dh = ps.create_data_handling(domain_size=(51, 51), periodicity=True, default_target=target)
loop_ctrs = [LoopOverCoordinate.get_loop_counter_symbol(i) for i in range(dh.dim)]
cond = [sp.Eq(sp.Mod(loop_ctrs[i], 2), 1) for i in range(dh.dim)]
field = dh.add_array("a", values_per_cell=1)
eq_list = [SympyAssignment(field.center, 1.0)]
if iteration_slice:
config = ps.CreateKernelConfig(target=dh.default_target, iteration_slice=iteration_slice)
assign = eq_list
else:
assign = [Conditional(sp.And(*cond), Block(eq_list))]
config = ps.CreateKernelConfig(target=dh.default_target)
kernel = ps.create_kernel(assign, config=config).compile()
dh.fill(field.name, 0, ghost_layers=True)
if config.target == ps.enums.Target.GPU:
dh.to_gpu(field.name)
dh.run_kernel(kernel)
if config.target == ps.enums.Target.GPU:
dh.to_cpu(field.name)
result = dh.gather_array(field.name, ghost_layers=True)
assert np.all(result[iteration_slice] == 1.0)
import pystencils as ps
import numpy as np import numpy as np
from pystencils.astnodes import LoopOverCoordinate, Block, SympyAssignment, TypedSymbol
import pystencils as ps
from pystencils.astnodes import Block, LoopOverCoordinate, SympyAssignment, TypedSymbol
from pystencils.transformations import move_constants_before_loop from pystencils.transformations import move_constants_before_loop
...@@ -24,7 +25,40 @@ def test_symbol_renaming(): ...@@ -24,7 +25,40 @@ def test_symbol_renaming():
loops = block.atoms(LoopOverCoordinate) loops = block.atoms(LoopOverCoordinate)
assert len(loops) == 2 assert len(loops) == 2
assert len(block.args[1].body.args) == 1
assert len(block.args[3].body.args) == 2
for loop in loops: for loop in loops:
assert len(loop.body.args) == 1
assert len(loop.parent.args) == 4 # 2 loops + 2 subexpressions assert len(loop.parent.args) == 4 # 2 loops + 2 subexpressions
assert loop.parent.args[0].lhs.name != loop.parent.args[1].lhs.name assert loop.parent.args[0].lhs.name != loop.parent.args[2].lhs.name
def test_keep_order_of_accesses():
f = ps.fields("f: [1D]")
x = TypedSymbol("x", np.float64)
n = 5
loop = LoopOverCoordinate(Block([SympyAssignment(x, f[0]),
SympyAssignment(f[1], 2 * x)]),
0, 0, n)
block = Block([loop])
ps.transformations.resolve_field_accesses(block)
new_loops = ps.transformations.cut_loop(loop, [n - 1])
ps.transformations.move_constants_before_loop(new_loops.args[1])
kernel_func = ps.astnodes.KernelFunction(
block, ps.Target.CPU, ps.Backend.C, ps.cpu.cpujit.make_python_function, None
)
kernel = kernel_func.compile()
print(ps.show_code(kernel_func))
f_arr = np.ones(n + 1)
kernel(f=f_arr)
print(f_arr)
assert np.allclose(f_arr, np.array([
1, 2, 4, 8, 16, 32
]))
import sympy as sp
from pystencils import AssignmentCollection, Assignment
from pystencils.node_collection import NodeCollection
from pystencils.astnodes import SympyAssignment
def test_node_collection_from_assignment_collection():
x = sp.symbols('x')
assignment_collection = AssignmentCollection([Assignment(x, 2)])
node_collection = NodeCollection.from_assignment_collection(assignment_collection)
assert node_collection.all_assignments[0] == SympyAssignment(x, 2)
import io import io
import json import json
from http.server import HTTPServer, BaseHTTPRequestHandler from http.server import BaseHTTPRequestHandler, HTTPServer
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from pystencils.runhelper import ParameterStudy from pystencils.runhelper import ParameterStudy
......
%% Cell type:code id: tags:
``` python
import pytest
pytest.importorskip('cupy')
```
%% Output
<module 'cupy' from '/home/markus/Python311/lib/python3.11/site-packages/cupy/__init__.py'>
%% Cell type:code id: tags:
``` python
from pystencils.session import *
sp.init_printing()
frac = sp.Rational
```
%% Cell type:markdown id: tags:
# Phase-field simulation of dentritic solidification in 3D
This notebook tests the model presented in the dentritic growth tutorial in 3D.
%% Cell type:code id: tags:
``` python
target = ps.Target.GPU
gpu = target == ps.Target.GPU
domain_size = (25, 25, 25) if 'is_test_run' in globals() else (300, 300, 300)
dh = ps.create_data_handling(domain_size=domain_size, periodicity=True, default_target=target)
φ_field = dh.add_array('phi', latex_name='φ')
φ_field_tmp = dh.add_array('phi_tmp', latex_name='φ_tmp')
φ_delta_field = dh.add_array('phidelta', latex_name='φ_D')
t_field = dh.add_array('T')
t_field_tmp = dh.add_array('T_tmp')
```
%% Cell type:code id: tags:
``` python
ε, m, δ, j, θzero, α, γ, Teq, κ, τ = sp.symbols("ε m δ j θ_0 α γ T_eq κ τ")
εb = sp.Symbol("\\bar{\\epsilon}")
discretize = ps.fd.Discretization2ndOrder(dx=0.03, dt=1e-5)
φ = φ_field.center
T = t_field.center
d = ps.fd.Diff
def f(φ, m):
return φ**4 / 4 - (frac(1, 2) - m/3) * φ**3 + (frac(1,4)-m/2)*φ**2
bulk_free_energy_density = f(φ, m)
interface_free_energy_density = ε ** 2 / 2 * (d(φ, 0) ** 2 + d(φ, 1) ** 2 + d(φ, 2) ** 2)
```
%% Cell type:markdown id: tags:
Here comes the major change, that has to be made for the 3D model: $\epsilon$ depends on the interface normal, which can not be computed simply as atan() as in the 2D case
%% Cell type:code id: tags:
``` python
n = sp.Matrix([d(φ, i) for i in range(3)])
nLen = sp.sqrt(sum(n_i**2 for n_i in n))
n = n / nLen
nVal = sum(n_i**4 for n_i in n)
σ = δ * nVal
εVal = εb * (1 + σ)
εVal
```
%% Output
$\displaystyle \bar{\epsilon} \left(δ \left(\frac{{\partial_{0} {φ}_{(0,0,0)}}^{4}}{\left({\partial_{0} {φ}_{(0,0,0)}}^{2} + {\partial_{1} {φ}_{(0,0,0)}}^{2} + {\partial_{2} {φ}_{(0,0,0)}}^{2}\right)^{2}} + \frac{{\partial_{1} {φ}_{(0,0,0)}}^{4}}{\left({\partial_{0} {φ}_{(0,0,0)}}^{2} + {\partial_{1} {φ}_{(0,0,0)}}^{2} + {\partial_{2} {φ}_{(0,0,0)}}^{2}\right)^{2}} + \frac{{\partial_{2} {φ}_{(0,0,0)}}^{4}}{\left({\partial_{0} {φ}_{(0,0,0)}}^{2} + {\partial_{1} {φ}_{(0,0,0)}}^{2} + {\partial_{2} {φ}_{(0,0,0)}}^{2}\right)^{2}}\right) + 1\right)$
⎛ ⎛ 4
⎜ ⎜ D(φ[0,0,0])
\bar{\epsilon}⋅⎜δ⋅⎜───────────────────────────────────────────── + ───────────
⎜ ⎜ 2
⎜ ⎜⎛ 2 2 2⎞ ⎛
⎝ ⎝⎝D(φ[0,0,0]) + D(φ[0,0,0]) + D(φ[0,0,0]) ⎠ ⎝D(φ[0,0,0]
4 4
D(φ[0,0,0]) D(φ[0,0,0])
────────────────────────────────── + ─────────────────────────────────────────
2
2 2 2⎞ ⎛ 2 2
) + D(φ[0,0,0]) + D(φ[0,0,0]) ⎠ ⎝D(φ[0,0,0]) + D(φ[0,0,0]) + D(φ[0,0,0]
⎞ ⎞
⎟ ⎟
────⎟ + 1⎟
2⎟ ⎟
2⎞ ⎟ ⎟
) ⎠ ⎠ ⎠
%% Cell type:code id: tags:
``` python
def m_func(temperature):
return (α / sp.pi) * sp.atan(γ * (Teq - temperature))
```
%% Cell type:code id: tags:
``` python
substitutions = {m: m_func(T),
ε: εVal}
fe_i = interface_free_energy_density.subs(substitutions)
fe_b = bulk_free_energy_density.subs(substitutions)
μ_if = ps.fd.expand_diff_full(ps.fd.functional_derivative(fe_i, φ), functions=[φ])
μ_b = ps.fd.expand_diff_full(ps.fd.functional_derivative(fe_b, φ), functions=[φ])
```
%% Cell type:code id: tags:
``` python
dF_dφ = μ_b + sp.Piecewise((μ_if, nLen**2 > 1e-10), (0, True))
```
%% Cell type:code id: tags:
``` python
parameters = {
τ: 0.0003,
κ: 1.8,
εb: 0.01,
δ: 0.3,
γ: 10,
j: 6,
α: 0.9,
Teq: 1.0,
θzero: 0.2,
sp.pi: sp.pi.evalf()
}
parameters
```
%% Output
$\displaystyle \left\{ \pi : 3.14159265358979, \ T_{eq} : 1.0, \ \bar{\epsilon} : 0.01, \ j : 6, \ α : 0.9, \ γ : 10, \ δ : 0.3, \ θ_{0} : 0.2, \ κ : 1.8, \ τ : 0.0003\right\}$
{π: 3.14159265358979, T_eq: 1.0, \bar{\epsilon}: 0.01, j: 6, α: 0.9, γ: 10, δ:
0.3, θ₀: 0.2, κ: 1.8, τ: 0.0003}
%% Cell type:code id: tags:
``` python
dφ_dt = - dF_dφ / τ
assignments = [
ps.Assignment(φ_delta_field.center, discretize(dφ_dt.subs(parameters))),
]
φEqs = ps.simp.sympy_cse_on_assignment_list(assignments)
φEqs.append(ps.Assignment(φ_field_tmp.center, discretize(ps.fd.transient(φ) - φ_delta_field.center)))
temperatureEvolution = -ps.fd.transient(T) + ps.fd.diffusion(T, 1) + κ * φ_delta_field.center
temperatureEqs = [
ps.Assignment(t_field_tmp.center, discretize(temperatureEvolution.subs(parameters)))
]
```
%% Cell type:code id: tags:
``` python
φ_kernel = ps.create_kernel(φEqs, cpu_openmp=4, target=target).compile()
temperatureKernel = ps.create_kernel(temperatureEqs, cpu_openmp=4, target=target).compile()
```
%% Cell type:code id: tags:
``` python
def time_loop(steps):
φ_sync = dh.synchronization_function(['phi'], target=target)
temperature_sync = dh.synchronization_function(['T'], target=target)
dh.all_to_gpu()
for t in range(steps):
φ_sync()
dh.run_kernel(φ_kernel)
temperature_sync()
dh.run_kernel(temperatureKernel)
dh.swap(φ_field.name, φ_field_tmp.name)
dh.swap(t_field.name, t_field_tmp.name)
dh.all_to_cpu()
def init(nucleus_size=np.sqrt(5)):
for b in dh.iterate():
x, y, z = b.cell_index_arrays
x, y, z = x - b.shape[0] // 2, y - b.shape[1] // 2, z - b.shape[2] // 2
b['phi'].fill(0)
b['phi'][(x ** 2 + y ** 2 + z ** 2) < nucleus_size ** 2] = 1.0
b['T'].fill(0.0)
def plot(slice_obj=ps.make_slice[:, :, 0.5]):
plt.subplot(1, 3, 1)
plt.scalar_field(dh.gather_array('phi', slice_obj).squeeze())
plt.title("φ")
plt.colorbar()
plt.subplot(1, 3, 2)
plt.title("T")
plt.scalar_field(dh.gather_array('T', slice_obj).squeeze())
plt.colorbar()
plt.subplot(1, 3, 3)
plt.title("∂φ")
plt.scalar_field(dh.gather_array('phidelta', slice_obj).squeeze())
plt.colorbar()
```
%% Cell type:code id: tags:
``` python
init()
plot()
print(dh)
```
%% Output
Name| Inner (min/max)| WithGl (min/max)
----------------------------------------------------
T| ( 0, 0)| ( 0, 0)
T_tmp| ( 0, 0)| ( 0, 0)
phi| ( 0, 1)| ( 0, 1)
phi_tmp| ( 0, 0)| ( 0, 0)
phidelta| ( 0, 0)| ( 0, 0)
%% Cell type:code id: tags:
``` python
if 'is_test_run' in globals():
time_loop(2)
assert np.isfinite(dh.max('phi'))
assert np.isfinite(dh.max('T'))
assert np.isfinite(dh.max('phidelta'))
else:
from time import perf_counter
vtk_writer = dh.create_vtk_writer('dentritic_growth_large', ['phi'])
last = perf_counter()
for i in range(4):
time_loop(100)
vtk_writer(i)
print("Step ", i, perf_counter() - last, dh.max('phi'))
last = perf_counter()
```
%% Output
Step 0 19.713090835999992 1.0
Step 1 19.673075279000045 1.0
Step 2 19.696444219 1.0
Step 3 19.752472744999977 1.0
from pystencils import Field, TypedSymbol
from copy import copy, deepcopy from copy import copy, deepcopy
from pystencils.field import Field
from pystencils.typing import TypedSymbol
def test_field_access(): def test_field_access():
field = Field.create_generic('some_field', spatial_dimensions=2, index_dimensions=0) field = Field.create_generic('some_field', spatial_dimensions=2, index_dimensions=0)
......