Breathing Motion Module#

The translational simple breathing motion module can be used to add an arbitrary 3D translation curve to a trajectory that is captured by a Module from cmrsim.trajectory. This notebook demonstrates the functionality by adding sinusoidal breathing motion for two cases:

  1. A slice of a contracting LV mesh

  2. Laminar flow in a U-Bend with refilling

The sinusoidal motion is choosen because it is provided as classmethod constructor by the SimpleBreathingMotionModule, but it can easily replaced by more realistic breathing curves. To get a more detailed insight into the wrapped trajectory modules have a look at their corresponding example notebooks (Trajectory Modules -> Fitting taylor expansion to particle trajectories) and (Trajectory Modules -> Laminar Flow in stenotic U-Bend)

[1]:
import sys
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

import pyvista
import numpy as np
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from pint import Quantity
from IPython.display import display, clear_output

import tensorflow as tf
gpus = tf.config.list_physical_devices("GPU")
if gpus:
    gpu_index = 3
    tf.config.set_visible_devices(gpus[gpu_index], "GPU")
    tf.config.experimental.set_memory_growth(gpus[gpu_index], True)


sys.path.insert(0, "../../")
import cmrsim

sys.path.append("../")
import local_functions
%matplotlib widget

Contracting LV-slice with sinosoidal motion#

In this section, first snapshots of the LV-mesh are loaded and a mid-ventricular slice is selected from that. The TaylorTrajectoryN module class is used to capture the contractile motion by a polynomial also assuming the motion to repeat infinitely. The a sinosoidal breathing curve with periodicity of 2 contraction intervals is instantiated in form of a SimpleBreathingMotionModule, wrapping the contraction module. After calling the modules according to the genereric Trajectory module Interface to get the trajectories on a new time-grid, the results are rendered as gifs.

Instantiate Contraction Trajectory#

[2]:
files = [f'../example_resources/NOR/VTK4Phantom/Displ{i:03}.vtk' for i in range(24)]
timing = Quantity(np.arange(0, 24) * 50, "ms")
mesh_dataset = cmrsim.datasets.CardiacMeshDataset.from_list_of_meshes(files, timing, time_precision_ms=3, mesh_numbers=(4, 40, 30))

slice_dict = dict(slice_thickness=Quantity(20, "mm"),
                  slice_normal=np.array((0., 0., 1.)),
                  slice_position=Quantity([0, 0, -3], "cm"),
                  reference_time=Quantity(-1., "ms"))

slice_mesh = mesh_dataset.select_slice(**slice_dict)
slice_mod =  cmrsim.datasets.MeshDataset(slice_mesh, mesh_dataset.timing)
t_contraction, r_contraction = slice_mod.get_trajectories(start=slice_dict["reference_time"], end=Quantity(slice_mod.timing[-1], "ms"))
r_contraction += Quantity([[0, 0, 3]], "cm")
trajectory_contraction = cmrsim.trajectory.TaylorTrajectoryN(order=8, time_grid=t_contraction.m_as("ms"),
                                                             particle_trajectories=r_contraction.m_as("m"), is_periodic=True)

clear_output()

Instantiate Breathing Trajectory#

[3]:
trajectory_breathing = cmrsim.trajectory.SimpleBreathingMotionModule.from_sinosoidal_motion(trajectory_contraction,
                                                                                            breathing_period=Quantity(2.300, "s"),
                                                                                            breathing_direction=np.array([0.5, 2., 1.]),
                                                                                            breathing_amplitude=Quantity(4.5, "cm"))

Call the modules#

[4]:
# Define the Time-scale to evalute the trajectory on
timing = Quantity(np.linspace(0, 2.3, 100).astype(np.float32), "s")           # Used for the __call__ method
dt = tf.constant((timing[1] - timing[0]).m_as("ms"), dtype=tf.float32)        # Used for the increment_particles method

# Call the contraction module in isolation for comparison
r_contraction, _ = trajectory_contraction(initial_positions=r_contraction[:, 0],
                                          timing=timing.m_as("ms"))

# Call the breathing + contraction module (using both available methods)
r_breath, _ = trajectory_breathing(initial_positions=r_contraction[:, 0], timing=timing.m_as("ms"))
r_breath_inc = np.swapaxes(np.stack([trajectory_breathing.increment_particles(particle_positions=r_contraction[:, 0], dt=dt)[0]
                                     for _ in tqdm(timing, leave=False)]), 0, 1)

# For illustration purposes, get a representation of the breathing curve
r_bm = trajectory_breathing._interpolate_breathing_curve(timing.m_as("ms"))

Illustration of the results#

[5]:
## Create plot of the breathing curve only
plt.ioff()
plt.style.use('dark_background')
plt.rcParams["font.family"] = "serif"
figure, ax = plt.subplots(1, 1, figsize=(5, 5))
ax.plot(timing.m_as("ms"), r_bm.numpy())
ax.set_title("Breathing curve"), ax.set_ylabel("[m]"), ax.set_xlabel("Time [ms]"), ax.legend(["x", "y", "z"])
figure.tight_layout()
figure.savefig("breathing_curve.png", dpi=100)
plt.close(figure)

# # ## Render contracting LV mesh
custom_theme = pyvista.themes.DefaultTheme()
custom_theme.background = 'black'
custom_theme.font.family = 'times'
custom_theme.font.color = 'white'
custom_theme.outline_color = "white"
custom_theme.edge_color = 'grey'
custom_theme.transparent_background = False

for fname, traj in zip(["contraction only", "breathing and contraction"], [r_contraction, r_breath]):
    pyvista.start_xvfb()
    plotter = pyvista.Plotter(off_screen=True, window_size=(500, 500), theme=custom_theme)
    plotter.add_text(fname, position="lower_edge", font_size=14)
    local_functions.add_custom_axes(plotter)
    local_functions.animate_trajectories(plotter, trajectories=np.swapaxes(traj, 0, 1),
                                         timing=timing.m_as("ms"), filename=f"{fname}.gif",
                                         visualize_trajectories=True, visualize_trajectories_max_length=5,
                                         trajectory_kwargs = dict(color="blue", show_scalar_bar=False, opacity=0.8),
                                         mesh_kwargs = dict(show_scalar_bar=False), initial_mesh=slice_mod.mesh)
    pyvista.close_all()



import imageio
import base64
from IPython.display import HTML
imgs = [imageio.v3.imread(fname) for fname in ["breathing_curve.png", "contraction only.gif", "breathing and contraction.gif"]]
combined_gif = imageio.get_writer(f'meshmesh.gif')
for f1, f2 in tqdm(zip(*imgs[1:]), total=100, leave=False):
    combined_gif.append_data(np.concatenate([imgs[0][..., :3], f1, f2], axis=1))
combined_gif.close()
b64 = base64.b64encode(open("meshmesh.gif",'rb').read()).decode('ascii')
display(HTML(f'<img src="data:image/gif;base64,{b64}" />'))

Laminar Flow in moving U-Bend#

In this section the laminar-flow in a U-Bend of the corresponding example notebook is reproduced (including refilling at the in-flow region). Again, the particle trajectories are wrapped with a breathing trajectory.

Loading the U-Bend mesh including velocity field#

[6]:
# Load UBend Example.
mesh = pyvista.read("../example_resources/sUbend_219_9.vtu")

# The velocity [m/s] is stored in the mesh as U1. In cmrsim all definitions are done in [ms] therefore
# We add the velocity field as scaled data to the mesh
mesh["velocity"] = Quantity(mesh["U"], "m/s").m_as("m/ms")

# To match the usual geometry (Head <-> Foot along z-axis) flip the axes of the mesh
mesh.points = mesh.points[:,[2,1,0]]
mesh["velocity"] = mesh["velocity"][:,[2,1,0]]
global_mesh_offset = [0, 0.04, -0.05]
mesh.translate(global_mesh_offset, inplace=True)
mesh = mesh.cell_data_to_point_data()

# clip to restricted length to reduce mesh-size
mesh.clip(normal='-z', origin=[0., 0., -0.1], inplace=True);
[6]:
HeaderData Arrays
UnstructuredGridInformation
N Cells1034880
N Points1061341
X Bounds-1.603e-02, 1.602e-02
Y Bounds-6.312e-02, 5.603e-02
Z Bounds-1.000e-01, 9.140e-02
N Arrays4
NameFieldTypeN CompMinMax
UPointsfloat643-2.534e+005.380e+00
pPointsfloat641-3.984e+009.466e+00
RSTPointsfloat646-9.636e-012.484e+00
velocityPointsfloat643-2.534e-035.380e-03

Instantiate RefillingFlowDataset, FlowTrajectory module and SimpleBreathingMotionModule#

[7]:
import cmrsim.utils.particle_properties as part_factory

# Define Reseeding slice parameters
seeding_slice_position = Quantity([0, 0.048, -0.045], "m")
seeding_slice_normal = np.array([1., 0., 0.])
seeding_slice_thickness = Quantity(50, "mm")
seeding_pixel_spacing = np.array((0.25e-3, 0.25e-3, 0.25e-3))  # resolution of seeding mesh, mm
lookup_mesh_spacing = np.array((0.5e-3, 0.5e-3, 0.5e-3)) # resolution of lookup mesh, mm
seeding_slice_bb = Quantity([5, 5.1, 2], "cm")

particle_creation_fncs = {"magnetization": part_factory.norm_magnetization(),
                          "T1": part_factory.uniform(default_value=300.),
                          "T2": part_factory.uniform(default_value=100.),
                          "M0": part_factory.uniform(default_value=1)}

dataset = cmrsim.datasets.RefillingFlowDataset(mesh,
                                               particle_creation_callables=particle_creation_fncs,
                                               slice_position=seeding_slice_position.m_as("m"),
                                               slice_normal=seeding_slice_normal,
                                               slice_thickness=seeding_slice_thickness.m_as("m"),
                                               slice_bounding_box=seeding_slice_bb.m_as("m"),
                                               lookup_map_spacing=lookup_mesh_spacing,
                                               seeding_vol_spacing=seeding_pixel_spacing,
                                               field_list=[("velocity", 3), ])
velocity_field_3d, map_dimensions = dataset.get_lookup_table()

# Setup laminar flow trajectory module
trajectory_flow = cmrsim.trajectory.FlowTrajectory(velocity_field_3d,
                                                   map_dimensions=map_dimensions.astype(np.float32),
                                                   additional_dimension_mapping=dataset.field_list[1:],
                                                   device="GPU:0")

# Setup the breathing trajectory module
trajectory_breathing = cmrsim.trajectory.SimpleBreathingMotionModule.from_sinosoidal_motion(trajectory_flow,
                                                                                            breathing_period=Quantity(1, "s"),
                                                                                            breathing_direction=np.array([0.5, 2., 1.]),
                                                                                            breathing_amplitude=Quantity(4.5, "cm"))
Updated Mesh
Updated Slice Position

Call the motion module with interleaved refilling#

Note: To make the refilling work, the breathing offset has to be subtracted/added around the refilling call. Inside the trajectory modules increment_particles this offset addition is handled already.

[8]:
particle_density = Quantity(1, "1/mm^3")
positions, properties = dataset.initial_filling(particle_density.m)
print(f"Filling resulted in a total of: {positions.shape[0]} particles")

n_TR = 50
steps = 5
substeps = 200
dt = tf.constant(Quantity(10, "us").m_as("ms"))
timing_flow = np.arange(0, steps * n_TR) * (substeps * dt.numpy())

breathing_curve = trajectory_breathing._interpolate_breathing_curve(timing_flow)

r = positions
props = properties

trajec, vel = [], []
for tr in tqdm(range(n_TR)):
    pos_per_step = np.empty([steps, *r.shape], dtype=np.float32)
    vel_per_step = np.empty([steps, *r.shape], dtype=np.float32)
    pbar =  tqdm(range(steps), leave=False)
    for step in pbar:
        for _ in range(substeps):
            r, fields = trajectory_breathing.increment_particles(particle_positions=r, dt=dt, return_velocities=True)
            pbar.set_postfix_str(", ".join([f"{float(s) :1.7f}" for s in np.mean(r, axis=0)]))
        pos_per_step[step] = r.numpy()
        vel_per_step[step] = fields["velocity"].numpy()
    trajec.append(pos_per_step), vel.append(vel_per_step)

    r = r - trajectory_breathing.current_offset()
    r, props, _ = dataset(particle_density.m, residual_particle_pos=r.numpy(),
                          particle_properties=props, reseed_threshold=0.3,
                          distance_tolerance=Quantity(30, "cm").m_as("m"))
    r = r + trajectory_breathing.current_offset()

np.savez("temp_flows", **{f"r_{i:02}": r_ for i, r_ in enumerate(trajec)}, **{f"v_{i:02}": v_ for i, v_ in enumerate(vel)})
arch = np.load("temp_flows.npz")
trajec = [arch[f"r_{i:02}"] for i in range(n_TR)]
vel = [arch[f"v_{i:02}"] for i in range(n_TR)]
Filling resulted in a total of: 257957 particles

Render results#

[13]:
## Render Breathing Curve
plt.ioff()
plt.style.use('dark_background')
plt.rcParams["font.family"] = "serif"
figure, ax = plt.subplots(1, 1, figsize=(5, 5))
ax.plot(timing_flow, breathing_curve.numpy())
ax.set_title("Breathing curve"), ax.set_ylabel("[m]"), ax.set_xlabel("Time [ms]"), ax.legend(["x", "y", "z"])
figure.tight_layout()
figure.savefig("breathing_curve.png", dpi=100)
plt.close(figure)

## Render moving Ubend
custom_theme = pyvista.themes.DefaultTheme()
custom_theme.background = 'black'
custom_theme.font.family = 'times'
custom_theme.font.color = 'white'
custom_theme.outline_color = "white"
custom_theme.edge_color = 'black'
custom_theme.transparent_background = False


pyvista.start_xvfb()
plotter = pyvista.Plotter(off_screen=True, window_size=(800, 500), theme=custom_theme)
local_functions.add_custom_axes(plotter)
skip_frames = 2
local_functions.animate_trajectories(plotter, [_ for t in trajec for _ in t][::skip_frames],
                     timing_flow[::skip_frames],
                     scalars=[_ for v in vel for _ in np.linalg.norm(v, axis=-1)][::skip_frames],
                     projection_curve=breathing_curve[::skip_frames],
                     filename="flow_and_breathing.gif",
                     mesh_kwargs=dict(cmap="twilight", opacity=0.5, clim=[0, 0.007],
                                      scalar_bar_args={"title":"Velocity [m/ms]"}),
                     text_kwargs=dict(color="white", position="upper_right", font_size=14),
                     projection_kwargs=dict(line_width=3))

import imageio
import base64
from IPython.display import HTML
imgs = [imageio.v3.imread(fname) for fname in ["breathing_curve.png", "flow_and_breathing.gif"]]
combined_gif = imageio.get_writer(f'meshmeshflow.gif')
for frame in tqdm(imgs[1], total=len(timing_flow[::skip_frames]), leave=False):
    combined_gif.append_data(np.concatenate([imgs[0][..., :3], frame], axis=1))
combined_gif.close()
b64 = base64.b64encode(open("meshmeshflow.gif",'rb').read()).decode('ascii')
display(HTML(f'<img src="data:image/gif;base64,{b64}" />'))