Laminar Flow in stenotic U-Bend#

This notebook demonstrates the functionality of the trajectory module cmrsim.trajectory.FlowTrajectoryModule, alongside the Re-seeding dataset to simulate particle trajectories inside a stenotic U-Bend. The geometry and the pre-simulated velocity field is given as example resource as a vtk-Mesh.

Imports#

[1]:
import sys
sys.path.append("../")
sys.path.insert(0, "../../")

import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
os.environ['CUDA_VISIBLE_DEVICES'] = "3"

import tensorflow as tf
gpu = tf.config.get_visible_devices("GPU")
if gpu:
    tf.config.experimental.set_memory_growth(gpu[0], True)
[2]:
# 3rd Party dependencies
from tqdm.notebook import tqdm
from pint import Quantity
import pyvista
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy
from IPython.display import display, HTML
import base64
%matplotlib inline

# Project library cmrsim
import local_functions
import cmrsim
import cmrsim.utils.particle_properties as part_factory

Define default settings for rendering the mesh

[3]:
custom_camera_pos = [(0.211, 0.267, 0.081),  (0.0262, 0.0261, 0.0172), (0.802, -0.570, -0.178)]

Load Mesh file#

The mesh is loaded and geometrically transformed/rotated such that the stanchions of the U-Bend points in z-direction. The modifications are performed using the pyvista package.

[4]:
# Load UBend Example.
mesh = pyvista.read("../example_resources/sUbend_219_9.vtu")

# The velocity [m/s] is stored in the mesh as U1. In cmrsim all definitions are done in [ms] therefore
# We add the velocity field as scaled data to the mesh
mesh["velocity"] = Quantity(mesh["U"], "m/s").m_as("m/ms")

# To match the usual geometry (Head <-> Foot along z-axis) flip the axes of the mesh
mesh.points = mesh.points[:, [2, 1, 0]]
mesh["velocity"] = mesh["velocity"][:, [2, 1, 0]]
global_mesh_offset = [0, 0.04, -0.05]
mesh.translate(global_mesh_offset, inplace=True)
mesh = mesh.cell_data_to_point_data()

# clip to restricted length to reduce mesh-size
mesh.clip(normal='-z', origin=[0., 0., -0.1], inplace=True)
[4]:
HeaderData Arrays
UnstructuredGridInformation
N Cells1034880
N Points1061341
X Bounds-1.603e-02, 1.602e-02
Y Bounds-6.312e-02, 5.603e-02
Z Bounds-1.000e-01, 9.140e-02
N Arrays4
NameFieldTypeN CompMinMax
UPointsfloat643-2.534e+005.380e+00
pPointsfloat641-3.984e+009.466e+00
RSTPointsfloat646-9.636e-012.484e+00
velocityPointsfloat643-2.534e-035.380e-03

Render the mesh and display image.

[5]:
pyvista.close_all()
plotter = pyvista.Plotter(off_screen=True, window_size=(500, 500))
local_functions.add_custom_axes(plotter)
plotter.camera_position = custom_camera_pos
plotter.add_mesh(mesh, scalars="velocity", cmap="twilight", opacity=0.5,
                 scalar_bar_args={"title":"Velocity [m/ms]"}, clim=[0., 0.006])
plotter.add_mesh(mesh.slice_orthogonal(), show_scalar_bar=False, cmap="twilight", clim=[0., 0.007])
plotter.show(jupyter_backend="static")
../../_images/example_gallery_trajectory_modules_flow_trajectory_module_9_0.png

Instantiate the Trajectory module#

The trajectory module needs a velocity field definition on a regular grid, as the look up and trilinear interpolation is implemented using this assumption. The most convenient way to achieve this, is to use the RefillingFlowDataset class. For more detailed instructions checkout the corresponding notebook and the API reference.

[8]:
# Define Reseeding slice parameters
seeding_slice_position = Quantity([0, 0.048, -0.045], "m")
seeding_slice_normal = np.array([1., 0., 0.])
seeding_slice_thickness = Quantity(50, "mm")
seeding_pixel_spacing = np.array((0.25e-3, 0.25e-3, 0.25e-3))  # resolution of seeding mesh, mm
lookup_mesh_spacing = np.array((0.5e-3, 0.5e-3, 0.5e-3))  # resolution of lookup mesh, mm
seeding_slice_bb = Quantity([5, 5.1, 2], "cm")
seeding_particle_density = 10  # per 1/mm*3

particle_creation_fncs = {"magnetization": part_factory.norm_magnetization(),
                          "T1": part_factory.uniform(default_value=300.),
                          "T2": part_factory.uniform(default_value=100.),
                          "M0": part_factory.uniform(default_value=1)}

dataset = cmrsim.datasets.RefillingFlowDataset(mesh,
                                               particle_creation_callables=particle_creation_fncs,
                                               slice_position=seeding_slice_position.m_as("m"),
                                               slice_normal=seeding_slice_normal,
                                               slice_thickness=seeding_slice_thickness.m_as("m"),
                                               slice_bounding_box=seeding_slice_bb.m_as("m"),
                                               lookup_map_spacing=lookup_mesh_spacing,
                                               seeding_vol_spacing=seeding_pixel_spacing,
                                               field_list=[("velocity", 3), ])
Updated Mesh
Updated Slice Position

Now actually instantiate the trajectory module by using the Dataset-field-definitions

[9]:
velocity_field_3d, map_dimensions = dataset.get_lookup_table()

# Setup trajectory module
trajectory_module = cmrsim.trajectory.FlowTrajectory(velocity_field_3d,
                                                     map_dimensions=map_dimensions.astype(np.float32),
                                                     additional_dimension_mapping=dataset.field_list[1:],
                                                     device="GPU:0")

Simulate Particle Trajectories#

To simulate actual trajetories, we first need to seed particles into the velocity fields. A reasonable assumption is, that the original volume is filled with a uniform density of partilces. Furthermore, in-flow and out-flow is taken care of by calling the RefillingFlowDataset instance every 5 milliseconds, mimicking a sequence with a corresponding TR.

[10]:
particle_density = Quantity(1, "1/mm^3")
positions, properties = dataset.initial_filling(particle_density.m)
print(f"Filling resulted in a total of: {positions.shape[0]} particles")
Filling resulted in a total of: 257957 particles
Now numerically solve the kinetic equation for particles in a laminar flow-field by calling the increment_particles function in a loop to advance the position of all seeded particles and interleave the reseeding as described above.
For illustration the intermediate positions and velocities are saved every 0.2 milliseconds.
[11]:
n_TR = 2
steps = 25
substeps = 70
dt = tf.constant(Quantity(10, "us").m_as("ms"))
r = positions
props = properties

trajec, vel = [], []
for tr in tqdm(range(n_TR)):
    pos_per_step = np.empty([steps, *r.shape], dtype=np.float32)
    vel_per_step = np.empty([steps, *r.shape], dtype=np.float32)

    for step in tqdm(range(steps), leave=False):
        for _ in range(substeps):
            r, fields = trajectory_module.increment_particles(r, dt, return_velocities=True)
        pos_per_step[step] = r.numpy()
        vel_per_step[step] = fields["velocity"].numpy()

    trajec.append(pos_per_step), vel.append(vel_per_step)
    r, props, _ = dataset(particle_density.m, residual_particle_pos=r.numpy(),
                          particle_properties=props, reseed_threshold=0.3,
                          distance_tolerance=Quantity(30, "cm").m_as("m"))

WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
I0000 00:00:1710868612.037213 2139880 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.

Animate Trajectories#

[12]:
timing = np.arange(0, steps * n_TR) * (substeps * dt.numpy())

plotter = pyvista.Plotter(off_screen=True, window_size=(2400, 950), shape=(1, 2))
plotter.subplot(0, 0)
local_functions.add_custom_axes(plotter)
plotter.camera_position = custom_camera_pos
plotter.add_mesh(mesh, scalars="velocity", cmap="twilight", opacity=0.5,
                 scalar_bar_args={"title": "Velocity [m/ms]"}, clim=[0., 0.006])
plotter.add_mesh(mesh.slice_orthogonal(), show_scalar_bar=False, cmap="twilight", clim=[0., 0.007])
local_functions.add_refilling_box(plotter, seeding_slice_bb, seeding_slice_thickness, seeding_slice_position)
plotter.add_text("C", position="upper_left")

plotter.subplot(0, 1)
local_functions.add_custom_axes(plotter)
plotter.camera_position = custom_camera_pos
plotter.add_text("D", position="upper_left")

local_functions.animate_trajectories(plotter, [_[0:] for t in trajec for _ in t], timing,
                                     filename="animation.gif",
                                     scalars=[_[0:] for v in vel for _ in np.linalg.norm(v, axis=-1)],
                                     mesh_kwargs=dict(cmap="twilight", opacity=0.5, clim=[0, 0.007],
                                                      scalar_bar_args={"title": "Velocity [m/ms]"}),
                                     text_kwargs=dict(color="black", position='upper_right'))

plotter.close()
[17]:
b64 = base64.b64encode(open("animation.gif", 'rb').read()).decode('ascii')
display(HTML(f'<img src="data:image/gif;base64,{b64}" />'))