Source code for cmrsim.trajectory._proper_ortho_decomp

"""Contains the implementation of a proper orthogonal decomposition module for trajectory modules"""

__all__ = ["PODTrajectoryModule"]

from typing import Dict, Tuple

import tensorflow as tf
import numpy as np

from cmrsim.trajectory._taylor import TaylorTrajectoryN
from cmrsim.trajectory._base import BaseTrajectoryModule


# pylint: disable=abstract-method
[docs] class PODTrajectoryModule(BaseTrajectoryModule): # pylint: disable=anomalous-backslash-in-string """Captures the trajectories of an arbitrary set of particles (e.g. Nodes of structured meshes) by computing the proper orthogonal decomposition (POD) from a set of snapshots. .. math:: u(t) = \Sigma_j^{N_{modes}} \phi_j w_j(t), where :math:`\phi_j` are the computed basis functions (modes) $w_j(t)$ are the corresponding mode-weights as function of time. To allow the motion state to be reconstructed from the low-rank representation at arbitrary time-points within the interval of specified snapshots, the mode-weights are represented as Taylor-series. .. dropdown:: Example Usage: .. code:: data, t = ... # get snapshots of states and corresponding time-points # Shapes: data.shape == (#particles, #snapshots, #channels) # t.shape == (#steps, ) pod_module = cmrsim.trajectory.PODTrajectoryModule(data, t, n_modes=5, poly_oder=8) new_time_grid = np.linspace(t[0].m, t[-1].m, 100).astype(np.float32) reconstructed_states, _ = pod_module(new_time_grid) ## reconstructed_states.shape == (#particles, 100, #channels) :param time_grid: (#time_steps) :param trajectories: (#particles, #time_steps, 3) :param n_modes: Number of modes used for reduce-order representation :param poly_order: Order of the Taylor-series used to fit the mode-weights :param additional_data: Dict[str, np.ndarray] of shape (#particles, #time_steps, #channels) :param batch_size: int :param is_periodic: """ #: Number of modes used for reduce-order representation n_modes: tf.Variable #: Computed basis-functions (modes) :math:`\phi_j` used to represent the input data in #: a reduced order. Shape (#particles * #channels, #modes) basis_function: tf.Variable #: Keeps track of the current timing when increment_particles is called. current_time_ms: tf.Variable #: Allows to only evaluate the position for a batch of stored particle trajectories current_batch_idx: tf.Variable #: Together with self.current_batch_size determines the subset of particle trajectories that #: is evaluated on call and increment_particles batch_size: tf.Variable #: Keeps track of the current timing when increment_particles is called. current_time_ms: tf.Variable #: ref_time: tf.Variable #: end_time: tf.Variable #: _nchannels: tf.Variable #: _additional_keys: Tuple[str] #: _additional_channels: Tuple[int] def __init__(self, time_grid: np.ndarray, trajectories: np.ndarray, n_modes: int, poly_order: int, additional_data: Dict[str, np.ndarray] = None, batch_size: int = None, is_periodic: bool = False): self.ref_time = tf.Variable(time_grid[0].astype(np.float32), dtype=tf.float32, shape=(), trainable=False) self.end_time = tf.Variable(time_grid[-1].astype(np.float32), dtype=tf.float32, shape=(), trainable=False) self.n_modes = tf.Variable(n_modes, shape=(), dtype=tf.int32) if additional_data is None: additional_data = {} self._additional_keys = tuple(additional_data.keys()) self._additional_channels = tuple([additional_data[k].shape[-1] for k in self._additional_keys]) self._additional_channel_idx = tuple([3, ] + np.cumsum(np.array(self._additional_channels)+3).tolist()) data = np.concatenate([trajectories, ] + [additional_data[k] for k in self._additional_keys], axis=-1) self._nchannels = tf.Variable(data.shape[2], shape=(), dtype=tf.int32) phi, mode_weights = self.calculate_pod(time_grid, data, n_modes, remove_mean=False) self.basis_function = tf.Variable(phi, shape=(None, n_modes), dtype=tf.float32) if batch_size is not None: self.batch_size = tf.Variable(batch_size, dtype=tf.int32, shape=(), trainable=False) else: self.batch_size = tf.Variable(data.shape[0], dtype=tf.int32, shape=(), trainable=False) self.current_batch_idx = tf.Variable(0, dtype=tf.int32, shape=(), trainable=False) self._taylor_module = TaylorTrajectoryN(order=poly_order, time_grid=time_grid, particle_trajectories=mode_weights.T.reshape(n_modes, -1, 1), batch_size=None, fit_on_init=True, is_periodic=False) self.current_time_ms = self._taylor_module.current_time_ms self.is_periodic = tf.constant(is_periodic, dtype=tf.bool)
[docs] @staticmethod def calculate_pod(time_grid: np.ndarray, data: np.ndarray, n_modes: int, remove_mean: bool = False) -> (np.ndarray, np.ndarray): """Computes the proper orthogonal decomposition of data snapshots at points defined in `time_grid`. Returns only the `n_modes` number of most significant modes :param time_grid: (#time_steps) time-points corresponding to snapshots :param data: (#particles, #time_steps, #channels) snapshots of data :param n_modes: number of most significant modes to return :param remove_mean: if true removes the temporal mean of all snapshots before computing POD :return: - POD base modes, shape: (#particles * #channels, n\_modes), - scaling of modes per time-step (#time_steps, n\_modes) """ n_tsteps = time_grid.shape[0] flat_sv = data.transpose(0, 2, 1).reshape(-1, n_tsteps) if remove_mean: sv_temporal_mean = np.mean(flat_sv, axis=1, keepdims=True) flat_sv -= sv_temporal_mean # compute square matrix C of shape (t, P) @ (P, t) -> (t, t) covariance_matrix = np.dot(flat_sv.T, flat_sv) # shapes: (t, ), (t, t) eigen_values, eigen_vectors = np.linalg.eigh(covariance_matrix) # Sort eigenvalues in descending order and take N largest -> shapes: (N,), (t, N) descending_sort_idx = np.argsort(eigen_values)[::-1][0:n_modes] eigen_values = eigen_values[descending_sort_idx] eigen_vectors = eigen_vectors[:, descending_sort_idx] # Scale eigen-vectors with inverse sqrt of eigen-value: # modes_cut = eigen_vectors.dot(np.diag(np.power(eigen_values, -0.5))) modes_cut = eigen_vectors / np.sqrt(eigen_values).reshape(1, -1) # (P*ch, t) @ (t, N) -> (P*ch, N) phi = np.dot(flat_sv, modes_cut) weights = np.einsum('pn, pt -> nt', flat_sv, phi) return phi, weights
[docs] def __call__(self, initial_positions: tf.Tensor, timing: tf.Tensor, batch_index: int = 0, **kwargs) -> (tf.Tensor, dict): """Reconstructs the data state at given times t, by evaluating the taylor series of mode-weights and computing the weighted sum. :param timing: (#timesteps) in milliseconds :return: (#particles, #timesteps, self._channels), {k: v} - additional data """ idx_before = self.current_batch_idx.read_value() self.current_batch_idx.assign(batch_index) data = self._evaluate_trajectory(timing) if len(self._additional_keys) > 0: additional_data = {k: data[self._additional_channel_idx[i:i+1]] for i, k in enumerate(self._additional_keys)} else: additional_data = {} self.current_batch_idx.assign(idx_before) return data[:, :, :3], additional_data
[docs] @tf.function def increment_particles(self, particle_positions: tf.Tensor, dt: tf.Tensor, **kwargs) -> (tf.Tensor, dict): """Evaluates the particle position for the time self.current_time_ms + dt and adds the delta t to the current_time_ms variable :param particle_positions: unused parameter (to adhere to calling signature of trajectory modules) :param dt: temporal step lengths :param kwargs: unused parameter (to adhere to calling signature of trajectory modules) :return: (#batch, self._channels), {k: v} - additional data """ self._taylor_module.current_time_ms.assign_add(dt) _t = tf.reshape(self._taylor_module.current_time_ms, [1, ]) data = self._evaluate_trajectory(_t) if len(self._additional_keys) > 0: additional_data = {k: data[self._additional_channel_idx[i:i+1]] for i, k in enumerate(self._additional_keys)} else: additional_data = {} return tf.ensure_shape(data[:, 0, :3], (None, 3)), additional_data
@tf.function(jit_compile=True) def _evaluate_trajectory(self, t: tf.Tensor) -> tf.Tensor: """Reconstructs the data state at given times t, by evaluating the taylor series of mode-weights and computing the weighted sum. :param t: (#steps) time-points to reconstruct at :return: data-states (#particles, #steps, #channels) """ t = t - self.ref_time if self.is_periodic: t = tf.math.floormod(t, self.end_time - self.ref_time) batch_start = self.current_batch_idx * self.batch_size * self._nchannels batch_end = batch_start + self.batch_size * self._nchannels # Evaluate mode weights from taylor-expansion -> shape: (t, N) mode_weights = tf.transpose(self._taylor_module._evaluate_trajectory(t)[:, :, 0], [1, 0]) # Compute weighted sum of basis functions / modes to reconstruct data values_at_t = tf.einsum('pn, tn -> pt', self.basis_function[batch_start:batch_end], mode_weights) # Reshape according to input shapes (#particles, #steps, #channels) _shape = tf.stack([-1, self._nchannels, tf.shape(mode_weights)[0]], axis=0) return tf.transpose(tf.reshape(values_at_t, _shape), [0, 2, 1])