""" This module contains the base implementation of the encoding mechanism of the simulation"""
__all__ = ["BaseSampling",]
from typing import Union, Iterable, Tuple
import abc
import math
import tensorflow as tf
import numpy as np
from pint import Quantity
# pylint: disable=abstract-method
[docs]
class BaseSampling(tf.Module):
"""Base Module for implementing a time-dependent sampling in k-space. Is meant to be inherited
from when specifying standard trajectories.
:param absolute_noise_std: if < 0, add_noise() will leave signal unchanged
:param name: (str) defining the module name-scope
:param k_space_segments: Number of segments in k-space. Total number of samples divided by
the number of segments must be an integer value
:param device: str e.g. 'GPU:0'
"""
# pylint: disable=too-many-instance-attributes
#: Flat and unsegmented tensor of all sampling-event times. Is set by a call of
# abstract function self._calculate_trajectory. Expected shape: (-1, )
sampling_times: tf.Variable
#: Flat and unsegmented tensor of 3D k-space vectors corresponding to sampling-events.
# Is set by a call of abstract function self._calculate_trajectory. Expected shape: (-1, 3)
k_space_vectors: tf.Variable
#: absolute values for the standard deviation of the sampled noise distributions added
# to the k-space signal
absolute_noise_std: tf.Variable
#: Number of segments used to subdivide the simulation memory load
k_space_segments: tf.Variable
#: Number of k-space samples that are defined by the _calculate trajectory method
number_of_samples: tf.Tensor
#: Name of the device that the module is executed on (defaults to: GPU:0 - CPU:0)
device: str
#: Spatial acquisition offset (corresponds to frequency offset)
acq_position: tf.Variable = tf.Variable(tf.zeros(3), dtype=tf.float32)
#: Orientation matrix (3, 4) performing the transformation from slice to global coordinates
ori_matrix: tf.Variable
_segmented_sampling_times = None # set in update
_segmented_k_vectors = None # set in update
def __init__(self, absolute_noise_std: Union[float, Iterable[float]], name: str = None,
k_space_segments: int = 1, device: str = None,
orientation_matrix: np.ndarray = None):
"""
:param absolute_noise_std: if < 0, add_noise() will leave signal unchanged
:param name: (str) defining the module name-scope
:param k_space_segments: int
:param device: str e.g. 'GPU:0'
"""
if device is None:
if tf.config.get_visible_devices('GPU'):
self.device = 'GPU:0'
else:
self.device = 'CPU:0'
else:
self.device = device
super().__init__(name=name)
if isinstance(absolute_noise_std, float):
absolute_noise_std = [absolute_noise_std, ]
with tf.device(self.device):
self.absolute_noise_std = tf.Variable(tf.constant(absolute_noise_std), dtype=tf.float32,
trainable=False, shape=[None, ],
name='absolute_noise_std')
self.k_space_segments = tf.Variable(tf.constant(k_space_segments), dtype=tf.int32,
trainable=False)
self.k_space_vectors = tf.Variable(tf.zeros((1, 3), tf.float32), shape=[None, 3],
dtype=tf.float32, trainable=False,
name='k_space_vectors')
self.sampling_times = tf.Variable(tf.zeros((1,), tf.float32), shape=[None, ],
dtype=tf.float32, trainable=False, name='sampling_times')
self.update()
self.ori_matrix = orientation_matrix
[docs]
def update(self):
""" Assigns values to trajectory vectors. Should be used after modifying the parameters of
the encoding module.
:return:
"""
k_space_vectors, sampling_times = self._calculate_trajectory()
self.k_space_vectors.assign(k_space_vectors)
self.sampling_times.assign(sampling_times)
self.number_of_samples = tf.size(self.sampling_times.read_value())
assert (self.number_of_samples % self.k_space_segments == 0)
# Define segmented k-space trajectory for lower GPU memory requirements
segment_size = tf.cast(tf.math.ceil(self.number_of_samples /
self.k_space_segments.read_value()), tf.int32)
total_element_count = self.k_space_segments.read_value() * segment_size
difference = total_element_count - self.number_of_samples
row_lengths = tf.constant([int(segment_size) for _ in
tf.range(self.k_space_segments.read_value() - 1)] +
[int(segment_size - difference), ], dtype=tf.int32)
self._segmented_sampling_times = tf.RaggedTensor.from_row_lengths(
self.sampling_times.read_value(), row_lengths)
self._segmented_k_vectors = tf.RaggedTensor.from_row_lengths(
self.k_space_vectors.read_value(), row_lengths)
[docs]
@tf.function(jit_compile=False)
def __call__(self, m_transverse: tf.Tensor, r_vectors: tf.Tensor):
""" Fourier transforms the handed batch of object points / isochromates,
:param m_transverse:
:param r_vectors:
:return: tf.Tensor of shape (#repetitions, #k-space-samples)
"""
with tf.device(self.device):
# Allocate k-space-Tensor
# s_of_k_segments = tf.TensorArray(dtype=tf.complex64, size=n_segments, dynamic_size=False)
samples_per_segment = tf.cast(self.number_of_samples / self.k_space_segments, tf.int32)
accumulator_shape = tf.stack([self.k_space_segments, tf.shape(m_transverse)[1], samples_per_segment], axis=0)
s_of_k_segments = tf.zeros(accumulator_shape, dtype=tf.complex64)
max_mtrans_index = tf.math.reduce_min(tf.stack([tf.shape(m_transverse)[-1], self.k_space_segments]))
max_rvectrans_index = tf.math.reduce_min(tf.stack([tf.shape(r_vectors)[-2], self.k_space_segments]))
# Loop over k-space segments, defined by k-space trajectory in self.encoding_module
for segment_index in tf.range(self.k_space_segments):
mseg_idx = tf.reduce_min(tf.stack([segment_index + 1, max_mtrans_index]))
rseg_idx = tf.reduce_min(tf.stack([segment_index + 1, max_rvectrans_index]))
m_segment = m_transverse[..., samples_per_segment * (mseg_idx - 1): samples_per_segment * mseg_idx]
r_segment = r_vectors[..., samples_per_segment * (rseg_idx - 1):samples_per_segment*rseg_idx, :]
k_space_segment = self.call_single_segment(m_segment, r_segment, segment_index=segment_index)
s_of_k_segments = tf.tensor_scatter_nd_update(s_of_k_segments, [[segment_index], ],
k_space_segment[tf.newaxis])
# Concat all segments and transpose axes to match needed format
return tf.reshape(tf.transpose(s_of_k_segments, [1, 0, 2]), [accumulator_shape[1], -1])
[docs]
@tf.function(jit_compile=False)
def call_single_segment(self, transverse_magnetization: tf.Tensor, r_vectors: tf.Tensor,
segment_index: Union[int, tf.Tensor] = 0, **kwargs) -> tf.Tensor:
"""Calculates fourier phases for given object-representation at r-vectors. For multiple
different contrasts #repetitions is the representing axis.
Motion during encoding is captured in the r_vectors input axis #k-space-samples. If
this axis has size=1, this position is reused for all sampling ADC events.
If #repetitions = 1 in r_vectors, the same trajectory/constant position is reused
for all contrasts.
:param transverse_magnetization: (#voxel, #repetitions, #k-space-samples) of
type tf.complex; #repetitions, #k-space-samples can be 1
:param r_vectors: (#voxel, #repetitions, #k-space-samples, 3), axis #repetitions
and #k-space-samples can be 1 to broadcast for coordinate reuse.
:param segment_index: int
:return: tf.Tensor (..., k-space-points in segment)
"""
with tf.device(self.device):
n_repetitions = tf.shape(transverse_magnetization)[1]
fourier_factors = self._calculate_fourier_phases(r_vectors=r_vectors,
segment_index=segment_index)
with tf.name_scope('shape_handling'):
if tf.shape(fourier_factors)[1] < n_repetitions:
multiples = n_repetitions // tf.shape(fourier_factors)[1]
fourier_factors = tf.tile(fourier_factors, [1, multiples, 1])
if tf.shape(transverse_magnetization)[2] == 1:
n_samples = tf.size(self._segmented_sampling_times[segment_index])
transverse_magnetization = tf.tile(transverse_magnetization,
[1, 1, n_samples])
fourier_transform_batch = tf.einsum('vrk, vrk -> rk',
transverse_magnetization, fourier_factors)
return fourier_transform_batch
def _calculate_fourier_phases(self, r_vectors: tf.Tensor, segment_index: tf.Tensor):
# pylint: disable=anomalous-backslash-in-string
""" Calculates the phase factor for encoding a static (during read-out) object for the
k-space-trajectory defined by self.k_space_vectors
.. math::
f = e^{2 \pi j\ k_r(t) \cdot r(t) }
:param r_vectors: (#batch, #repetition, #kspace, 3)
:param segment_index: int
:return:
"""
with tf.name_scope('calculate_fourier_phase'):
segment_batch_size = tf.size(self._segmented_sampling_times[segment_index])
if self.ori_matrix is not None:
# augment dummy entry (x,y,z,1)
r_vectors = r_vectors - tf.reshape(self.ori_matrix[:, 3], [1, 1, 3])
r_vectors = tf.einsum('ij, brkj -> brki', self.ori_matrix[:3, :3], r_vectors)
if tf.shape(r_vectors)[2] == 1: # tile if #kspace-samples is 1
r_vectors = tf.tile(r_vectors, [1, 1, segment_batch_size, 1])
elif tf.shape(r_vectors)[2] != segment_batch_size:
start = segment_batch_size * segment_index
r_vectors = r_vectors[..., start:start + segment_batch_size, :]
r_vectors = r_vectors - tf.reshape(self.acq_position, [1, 1, 1, 3])
fourier_phases = tf.einsum('vrkj, kj -> vrk', r_vectors,
self._segmented_k_vectors[segment_index])
fourier_phases = 1j * tf.cast(
tf.scalar_mul(tf.constant(2. * math.pi, tf.float32), fourier_phases), tf.complex64)
fourier_factors = tf.exp(-fourier_phases)
return fourier_factors
[docs]
def add_noise(self, s_of_k: tf.Tensor, **kwargs):
""" Adds noise to k-space-samples, expands the number of axis by one and appends the
different noise instantiations as second last axis.
:param s_of_k: Tensor containing all encoded k-space samples
:return: tf.Tensor
"""
with tf.name_scope('add_noise'):
old_shape = tf.shape(s_of_k)
s_of_k = tf.expand_dims(s_of_k, axis=-2)
if tf.reduce_any(tf.math.greater(self.absolute_noise_std, tf.constant(0.))):
seed = kwargs.get("seed", None)
# Set values less that 0 to exactly zero to scales sampled noise vector correctly
clean_noise_std = tf.where(
tf.math.greater(self.absolute_noise_std, tf.constant(0.)),
self.absolute_noise_std, tf.zeros_like(self.absolute_noise_std))
noise_vector_shape = tf.concat(
((2,), old_shape, (tf.size(self.absolute_noise_std),)), 0)
noise_vector = tf.random.normal(noise_vector_shape, mean=0., stddev=1., seed=seed)
# Transposition of last two axis on purpose for subsequent addition
# Einsum indices: real/imag channel, image_batch, ...
# ... repetitions, pixels, noise_instantiations
n_half = tf.cast(tf.size(self.sampling_times), tf.float32) / 2.
noise_vector = tf.einsum('crpn, n -> crnp', noise_vector,
clean_noise_std * tf.sqrt(n_half))
complex_noise_vector = tf.complex(noise_vector[0], noise_vector[1])
s_of_k = s_of_k + complex_noise_vector
return s_of_k
@abc.abstractmethod
def _calculate_trajectory(self):
""" Calculates the k_space_trajectory with the given variables of the module. The
trajectory is given as tensor of shape (#samples, 3). Additionally it should return a
tensor of times corresponding to the samples
(#samples, 1) in ms
:return: k_space_vectors, sampling_times
"""
raise NotImplementedError
[docs]
def get_sampling_times(self):
"""Getter for sampling times. Defines format that should be used for all Signal modules
that need the timing of the sampling events.
:return: - tf.RaggedTensor or numpy.ndarray
- sampling times as numpy.ndarray in case the trajectory is not segmented.
"""
if int(self.k_space_segments) > 1: # noqa
return self._segmented_sampling_times
return self.sampling_times.read_value()
[docs]
def set_orientation_matrix(self, slice_position: Quantity, slice_normal: np.ndarray,
readout_direction: np.ndarray):
"""
:param slice_position: (3, )
:param slice_normal:
:param readout_direction:
"""
from cmrsim.utils.coordinates import compute_orientation_matrix
# Strip the last row to make it a (3, 4) matrix
trafo_mat = compute_orientation_matrix(slice_normal, slice_position, readout_direction)[:3]
if self.ori_matrix is None:
self.ori_matrix = tf.Variable(trafo_mat.astype(np.float32), shape=(3, 4),
dtype=tf.float32)
else:
self.ori_matrix.assign(trafo_mat.astype(np.float32))