Breathing Motion Module#
The translational simple breathing motion module can be used to add an arbitrary 3D translation curve to a trajectory that is captured by a Module from cmrsim.trajectory
. This notebook demonstrates the functionality by adding sinusoidal breathing motion for two cases:
A slice of a contracting LV mesh
Laminar flow in a U-Bend with refilling
The sinusoidal motion is choosen because it is provided as classmethod constructor by the SimpleBreathingMotionModule
, but it can easily replaced by more realistic breathing curves. To get a more detailed insight into the wrapped trajectory modules have a look at their corresponding example notebooks (Trajectory Modules -> Fitting taylor expansion to particle trajectories) and (Trajectory Modules -> Laminar Flow in stenotic U-Bend)
[1]:
import sys
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import pyvista
import numpy as np
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from pint import Quantity
from IPython.display import display, clear_output
import tensorflow as tf
gpus = tf.config.list_physical_devices("GPU")
if gpus:
gpu_index = 3
tf.config.set_visible_devices(gpus[gpu_index], "GPU")
tf.config.experimental.set_memory_growth(gpus[gpu_index], True)
sys.path.insert(0, "../../")
import cmrsim
sys.path.append("../")
import local_functions
%matplotlib widget
Contracting LV-slice with sinosoidal motion#
In this section, first snapshots of the LV-mesh are loaded and a mid-ventricular slice is selected from that. The TaylorTrajectoryN
module class is used to capture the contractile motion by a polynomial also assuming the motion to repeat infinitely. The a sinosoidal breathing curve with periodicity of 2 contraction intervals is instantiated in form of a SimpleBreathingMotionModule
, wrapping the contraction module. After calling the modules according to the genereric Trajectory module
Interface to get the trajectories on a new time-grid, the results are rendered as gifs.
Instantiate Contraction Trajectory#
[2]:
files = [f'../example_resources/NOR/VTK4Phantom/Displ{i:03}.vtk' for i in range(24)]
timing = Quantity(np.arange(0, 24) * 50, "ms")
mesh_dataset = cmrsim.datasets.CardiacMeshDataset.from_list_of_meshes(files, timing, time_precision_ms=3, mesh_numbers=(4, 40, 30))
slice_dict = dict(slice_thickness=Quantity(20, "mm"),
slice_normal=np.array((0., 0., 1.)),
slice_position=Quantity([0, 0, -3], "cm"),
reference_time=Quantity(-1., "ms"))
slice_mesh = mesh_dataset.select_slice(**slice_dict)
slice_mod = cmrsim.datasets.MeshDataset(slice_mesh, mesh_dataset.timing)
t_contraction, r_contraction = slice_mod.get_trajectories(start=slice_dict["reference_time"], end=Quantity(slice_mod.timing[-1], "ms"))
r_contraction += Quantity([[0, 0, 3]], "cm")
trajectory_contraction = cmrsim.trajectory.TaylorTrajectoryN(order=8, time_grid=t_contraction.m_as("ms"),
particle_trajectories=r_contraction.m_as("m"), is_periodic=True)
clear_output()
Instantiate Breathing Trajectory#
[3]:
trajectory_breathing = cmrsim.trajectory.SimpleBreathingMotionModule.from_sinosoidal_motion(trajectory_contraction,
breathing_period=Quantity(2.300, "s"),
breathing_direction=np.array([0.5, 2., 1.]),
breathing_amplitude=Quantity(4.5, "cm"))
Call the modules#
[4]:
# Define the Time-scale to evalute the trajectory on
timing = Quantity(np.linspace(0, 2.3, 100).astype(np.float32), "s") # Used for the __call__ method
dt = tf.constant((timing[1] - timing[0]).m_as("ms"), dtype=tf.float32) # Used for the increment_particles method
# Call the contraction module in isolation for comparison
r_contraction, _ = trajectory_contraction(initial_positions=r_contraction[:, 0],
timing=timing.m_as("ms"))
# Call the breathing + contraction module (using both available methods)
r_breath, _ = trajectory_breathing(initial_positions=r_contraction[:, 0], timing=timing.m_as("ms"))
r_breath_inc = np.swapaxes(np.stack([trajectory_breathing.increment_particles(particle_positions=r_contraction[:, 0], dt=dt)[0]
for _ in tqdm(timing, leave=False)]), 0, 1)
# For illustration purposes, get a representation of the breathing curve
r_bm = trajectory_breathing._interpolate_breathing_curve(timing.m_as("ms"))
Illustration of the results#
[5]:
## Create plot of the breathing curve only
plt.ioff()
plt.style.use('dark_background')
plt.rcParams["font.family"] = "serif"
figure, ax = plt.subplots(1, 1, figsize=(5, 5))
ax.plot(timing.m_as("ms"), r_bm.numpy())
ax.set_title("Breathing curve"), ax.set_ylabel("[m]"), ax.set_xlabel("Time [ms]"), ax.legend(["x", "y", "z"])
figure.tight_layout()
figure.savefig("breathing_curve.png", dpi=100)
plt.close(figure)
# # ## Render contracting LV mesh
custom_theme = pyvista.themes.DefaultTheme()
custom_theme.background = 'black'
custom_theme.font.family = 'times'
custom_theme.font.color = 'white'
custom_theme.outline_color = "white"
custom_theme.edge_color = 'grey'
custom_theme.transparent_background = False
for fname, traj in zip(["contraction only", "breathing and contraction"], [r_contraction, r_breath]):
pyvista.start_xvfb()
plotter = pyvista.Plotter(off_screen=True, window_size=(500, 500), theme=custom_theme)
plotter.add_text(fname, position="lower_edge", font_size=14)
local_functions.add_custom_axes(plotter)
local_functions.animate_trajectories(plotter, trajectories=np.swapaxes(traj, 0, 1),
timing=timing.m_as("ms"), filename=f"{fname}.gif",
visualize_trajectories=True, visualize_trajectories_max_length=5,
trajectory_kwargs = dict(color="blue", show_scalar_bar=False, opacity=0.8),
mesh_kwargs = dict(show_scalar_bar=False), initial_mesh=slice_mod.mesh)
pyvista.close_all()
import imageio
import base64
from IPython.display import HTML
imgs = [imageio.v3.imread(fname) for fname in ["breathing_curve.png", "contraction only.gif", "breathing and contraction.gif"]]
combined_gif = imageio.get_writer(f'meshmesh.gif')
for f1, f2 in tqdm(zip(*imgs[1:]), total=100, leave=False):
combined_gif.append_data(np.concatenate([imgs[0][..., :3], f1, f2], axis=1))
combined_gif.close()
b64 = base64.b64encode(open("meshmesh.gif",'rb').read()).decode('ascii')
display(HTML(f'<img src="data:image/gif;base64,{b64}" />'))
Laminar Flow in moving U-Bend#
In this section the laminar-flow in a U-Bend of the corresponding example notebook is reproduced (including refilling at the in-flow region). Again, the particle trajectories are wrapped with a breathing trajectory.
Loading the U-Bend mesh including velocity field#
[6]:
# Load UBend Example.
mesh = pyvista.read("../example_resources/sUbend_219_9.vtu")
# The velocity [m/s] is stored in the mesh as U1. In cmrsim all definitions are done in [ms] therefore
# We add the velocity field as scaled data to the mesh
mesh["velocity"] = Quantity(mesh["U"], "m/s").m_as("m/ms")
# To match the usual geometry (Head <-> Foot along z-axis) flip the axes of the mesh
mesh.points = mesh.points[:,[2,1,0]]
mesh["velocity"] = mesh["velocity"][:,[2,1,0]]
global_mesh_offset = [0, 0.04, -0.05]
mesh.translate(global_mesh_offset, inplace=True)
mesh = mesh.cell_data_to_point_data()
# clip to restricted length to reduce mesh-size
mesh.clip(normal='-z', origin=[0., 0., -0.1], inplace=True);
[6]:
Header | Data Arrays | ||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
Instantiate RefillingFlowDataset, FlowTrajectory module and SimpleBreathingMotionModule#
[7]:
import cmrsim.utils.particle_properties as part_factory
# Define Reseeding slice parameters
seeding_slice_position = Quantity([0, 0.048, -0.045], "m")
seeding_slice_normal = np.array([1., 0., 0.])
seeding_slice_thickness = Quantity(50, "mm")
seeding_pixel_spacing = np.array((0.25e-3, 0.25e-3, 0.25e-3)) # resolution of seeding mesh, mm
lookup_mesh_spacing = np.array((0.5e-3, 0.5e-3, 0.5e-3)) # resolution of lookup mesh, mm
seeding_slice_bb = Quantity([5, 5.1, 2], "cm")
particle_creation_fncs = {"magnetization": part_factory.norm_magnetization(),
"T1": part_factory.uniform(default_value=300.),
"T2": part_factory.uniform(default_value=100.),
"M0": part_factory.uniform(default_value=1)}
dataset = cmrsim.datasets.RefillingFlowDataset(mesh,
particle_creation_callables=particle_creation_fncs,
slice_position=seeding_slice_position.m_as("m"),
slice_normal=seeding_slice_normal,
slice_thickness=seeding_slice_thickness.m_as("m"),
slice_bounding_box=seeding_slice_bb.m_as("m"),
lookup_map_spacing=lookup_mesh_spacing,
seeding_vol_spacing=seeding_pixel_spacing,
field_list=[("velocity", 3), ])
velocity_field_3d, map_dimensions = dataset.get_lookup_table()
# Setup laminar flow trajectory module
trajectory_flow = cmrsim.trajectory.FlowTrajectory(velocity_field_3d,
map_dimensions=map_dimensions.astype(np.float32),
additional_dimension_mapping=dataset.field_list[1:],
device="GPU:0")
# Setup the breathing trajectory module
trajectory_breathing = cmrsim.trajectory.SimpleBreathingMotionModule.from_sinosoidal_motion(trajectory_flow,
breathing_period=Quantity(1, "s"),
breathing_direction=np.array([0.5, 2., 1.]),
breathing_amplitude=Quantity(4.5, "cm"))
Updated Mesh
Updated Slice Position
Call the motion module with interleaved refilling#
Note: To make the refilling work, the breathing offset has to be subtracted/added around the refilling call. Inside the trajectory modules increment_particles
this offset addition is handled already.
[8]:
particle_density = Quantity(1, "1/mm^3")
positions, properties = dataset.initial_filling(particle_density.m)
print(f"Filling resulted in a total of: {positions.shape[0]} particles")
n_TR = 50
steps = 5
substeps = 200
dt = tf.constant(Quantity(10, "us").m_as("ms"))
timing_flow = np.arange(0, steps * n_TR) * (substeps * dt.numpy())
breathing_curve = trajectory_breathing._interpolate_breathing_curve(timing_flow)
r = positions
props = properties
trajec, vel = [], []
for tr in tqdm(range(n_TR)):
pos_per_step = np.empty([steps, *r.shape], dtype=np.float32)
vel_per_step = np.empty([steps, *r.shape], dtype=np.float32)
pbar = tqdm(range(steps), leave=False)
for step in pbar:
for _ in range(substeps):
r, fields = trajectory_breathing.increment_particles(particle_positions=r, dt=dt, return_velocities=True)
pbar.set_postfix_str(", ".join([f"{float(s) :1.7f}" for s in np.mean(r, axis=0)]))
pos_per_step[step] = r.numpy()
vel_per_step[step] = fields["velocity"].numpy()
trajec.append(pos_per_step), vel.append(vel_per_step)
r = r - trajectory_breathing.current_offset()
r, props, _ = dataset(particle_density.m, residual_particle_pos=r.numpy(),
particle_properties=props, reseed_threshold=0.3,
distance_tolerance=Quantity(30, "cm").m_as("m"))
r = r + trajectory_breathing.current_offset()
np.savez("temp_flows", **{f"r_{i:02}": r_ for i, r_ in enumerate(trajec)}, **{f"v_{i:02}": v_ for i, v_ in enumerate(vel)})
arch = np.load("temp_flows.npz")
trajec = [arch[f"r_{i:02}"] for i in range(n_TR)]
vel = [arch[f"v_{i:02}"] for i in range(n_TR)]
Filling resulted in a total of: 257957 particles
Render results#
[13]:
## Render Breathing Curve
plt.ioff()
plt.style.use('dark_background')
plt.rcParams["font.family"] = "serif"
figure, ax = plt.subplots(1, 1, figsize=(5, 5))
ax.plot(timing_flow, breathing_curve.numpy())
ax.set_title("Breathing curve"), ax.set_ylabel("[m]"), ax.set_xlabel("Time [ms]"), ax.legend(["x", "y", "z"])
figure.tight_layout()
figure.savefig("breathing_curve.png", dpi=100)
plt.close(figure)
## Render moving Ubend
custom_theme = pyvista.themes.DefaultTheme()
custom_theme.background = 'black'
custom_theme.font.family = 'times'
custom_theme.font.color = 'white'
custom_theme.outline_color = "white"
custom_theme.edge_color = 'black'
custom_theme.transparent_background = False
pyvista.start_xvfb()
plotter = pyvista.Plotter(off_screen=True, window_size=(800, 500), theme=custom_theme)
local_functions.add_custom_axes(plotter)
skip_frames = 2
local_functions.animate_trajectories(plotter, [_ for t in trajec for _ in t][::skip_frames],
timing_flow[::skip_frames],
scalars=[_ for v in vel for _ in np.linalg.norm(v, axis=-1)][::skip_frames],
projection_curve=breathing_curve[::skip_frames],
filename="flow_and_breathing.gif",
mesh_kwargs=dict(cmap="twilight", opacity=0.5, clim=[0, 0.007],
scalar_bar_args={"title":"Velocity [m/ms]"}),
text_kwargs=dict(color="white", position="upper_right", font_size=14),
projection_kwargs=dict(line_width=3))
import imageio
import base64
from IPython.display import HTML
imgs = [imageio.v3.imread(fname) for fname in ["breathing_curve.png", "flow_and_breathing.gif"]]
combined_gif = imageio.get_writer(f'meshmeshflow.gif')
for frame in tqdm(imgs[1], total=len(timing_flow[::skip_frames]), leave=False):
combined_gif.append_data(np.concatenate([imgs[0][..., :3], frame], axis=1))
combined_gif.close()
b64 = base64.b64encode(open("meshmeshflow.gif",'rb').read()).decode('ascii')
display(HTML(f'<img src="data:image/gif;base64,{b64}" />'))