Source code for hyperion.pdfs.mixtures.gmm

"""
 Copyright 2018 Johns Hopkins University  (Author: Jesus Villalba)
 Apache 2.0  (http://www.apache.org/licenses/LICENSE-2.0)
"""
import numpy as np
import h5py
import scipy.linalg as la
from scipy.special import erf


from ...hyp_defs import float_cpu
from ...utils.math import (
    softmax,
    logsumexp,
    invert_pdmat,
    invert_trimat,
    symmat2vec,
    vec2symmat,
    fullcov_varfloor,
    logdet_pdmat,
)
from ...utils.plotting import (
    plot_gaussian_1D,
    plot_gaussian_ellipsoid_2D,
    plot_gaussian_ellipsoid_3D,
    plot_gaussian_3D,
)
from ...clustering import KMeans

from ..core import Normal
from .exp_family_mixture import ExpFamilyMixture


[docs]class GMM(ExpFamilyMixture):
[docs] def __init__( self, mu=None, Lambda=None, var_floor=1e-3, update_mu=True, update_Lambda=True, **kwargs ): super().__init__(**kwargs) self.mu = mu self.Lambda = Lambda self.var_floor = var_floor self.update_mu = update_mu self.update_Lambda = update_Lambda self._compute_gmm_nat_std() self._logLambda = None self._cholLambda = None self._Sigma = None
def _compute_gmm_nat_std(self): if self.mu is not None and self.Lambda is not None: self._validate_mu() self._validate_Lambda() self._compute_nat_params() elif self.eta is not None: self._validate_eta() self.A = self.compute_A_nat(self.eta) self._compute_std_params()
[docs] def compute_Lambda_aux(self): self._logLambda = np.zeros((self.num_comp,), dtype=float_cpu()) self._cholLambda = np.zeros( (self.num_comp, self.x_dim, self.x_dim), dtype=float_cpu() ) for i, L in enumerate(self.Lambda): f, L, logL = invert_pdmat(L, return_logdet=True) self._logLambda[i] = logL self._cholLambda[i] = L.T
@property def logLambda(self): if self._logLambda is None: self.compute_Lambda_aux() return self._logLambda @property def cholLambda(self): if self._cholLambda is None: self.compute_Lambda_aux() return self._cholLambda @property def Sigma(self): if self._Sigma is None: self._Sigma = np.zeros( (self.num_comp, self.x_dim, self.x_dim), dtype=float_cpu() ) for k in range(self.num_comp): self._Sigma[k] = invert_pdmat(self.Lambda[k], return_inv=True)[-1] return self._Sigma
[docs] def initialize(self, x=None): if x is None and self.mu is None and self.eta is None: assert self.num_comp == 1 self._initialize_stdnormal() if x is not None: self._initialize_kmeans(self.num_comp, x) self.validate() self._compute_gmm_nat_std()
def _initialize_stdnormal(self): self.pi = np.array([1], dtype=float_cpu()) self.mu = np.zeros((1, self.x_dim), dtype=float_cpu()) self.Lambda = np.zeros((1, self.x_dim, self.x_dim), dtype=float_cpu()) self.Lambda[0] = np.eye(self.x_dim, dtype=float_cpu()) def _initialize_kmeans(self, num_comp, x): if num_comp == 1: self.pi = np.array([1], dtype=float_cpu()) self.mu = np.mean(x, axis=0, keepdims=True) self.Lambda = np.zeros((1, self.x_dim, self.x_dim), dtype=float_cpu()) delta = x - self.mu S = np.dot(delta.T, delta) / x.shape[0] self.Lambda[0] = invert_pdmat(S, return_inv=True)[-1] return kmeans = KMeans(num_clusters=num_comp) loss, cluster_index = kmeans.fit(x, epochs=100) self.mu = kmeans.mu self.pi = np.zeros((self.num_comp,), dtype=float_cpu()) self.Lambda = np.zeros( (self.num_comp, self.x_dim, self.x_dim), dtype=float_cpu() ) for k in range(num_comp): r = cluster_index == k self.pi[k] = np.sum(r) / x.shape[0] delta = x[r] - self.mu[k] S = np.dot(delta.T, delta) / np.sum(r) self.Lambda[k] = invert_pdmat(S, return_inv=True)[-1]
[docs] def stack_suff_stats(self, F, S=None): if S is None: return F return np.hstack((F, S))
[docs] def unstack_suff_stats(self, stats): F = stats[:, : self.x_dim] S = stats[:, self.x_dim :] return F, S
[docs] def norm_suff_stats(self, N, u_x, return_order2=False): F, S = self.unstack_suff_stats(u_x) F_norm = F - N[:, None] * self.mu for k in range(self.num_comp): F_norm[k] = np.dot(F_norm[k], self.cholLambda[k].T) if return_order2: SS = vec2symat(S[k]) Fmu = np.outer(self.F[k], self.mu[k]) SS = SS - Fmu - Fmu.T + N * np.outer(self.mu[k], self.mu[k]) SS = np.dot(self.cholLambda[k], np.dot(SS, self.cholLambda[k].T)) S[k] = symmat2vec(SS) if return_order2: return N, self.stack_suff_stats(F_norm, S) return N, F_norm
[docs] def Mstep(self, N, u_x): F, S = self.unstack_suff_stats(u_x) if self.update_mu: self.mu = F / N[:, None] if self.update_Lambda: C = np.zeros((self.num_comp, self.x_dim, self.x_dim), dtype=float_cpu()) for k in range(self.num_comp): C[k] = vec2symmat(S[k] / N[k]) C[k] -= np.outer(self.mu[k], self.mu[k]) Sfloor = self.var_floor * np.mean(C, axis=0) cholfloor = la.cholesky(Sfloor, overwrite_a=True) for k in range(self.num_comp): C[k] = fullcov_varfloor(C[k], cholfloor, F_is_chol=True) self.Lambda[k] = invert_pdmat(C[k], return_inv=True)[-1] self._Sigma = None self._logLambda = None self._cholLambda = None if self.update_pi: N0 = N < self.min_N if np.any(N0): N[N0] = 0 mu[N0] = 0 S[N0] = 1 self.pi = N / np.sum(N) self._log_pi = None self._compute_nat_params()
[docs] def split_comp(self, K=2): num_comp = self.num_comp * K pi = np.repeat(self.pi, K) / K Lambda = np.repeat(self.Lambda, K, axis=0) * (K ** 2) mu = np.repeat(self.mu, K, axis=0) for g in range(self.num_comp): w, v = la.eigh(self.Sigma[g]) v *= np.sqrt(v) if K == 2: std_dev = np.sum(v, axis=1) mu[2 * g] += std_dev mu[2 * g + 1] -= std_dev else: for k in range(K): factor = 2 * (np.random.uniform(size=(v.shape[1],)) > 0.5) - 1 std_dev = np.sum(v * factor, axis=1) mu[K * g + k] += std_dev config = self.get_config() return GMM(pi=pi, mu=mu, Lambda=Lambda, **config)
[docs] def log_prob_std(self, x): r0 = self.log_pi + 0.5 * self.logLambda - 0.5 * self.x_dim * np.log(2 * np.pi) llk_k = np.zeros((x.shape[0], self.num_comp), dtype=float_cpu()) for k in range(self.num_comp): mah_dist2 = np.sum(np.dot(x - self.mu[k], self.cholLambda[k]) ** 2, axis=1) llk_k[:, k] = r0[k] - 0.5 * mah_dist2 return logsumexp(llk_k, axis=-1)
[docs] def sample(self, num_samples, rng=None, seed=1024): if rng is None: rng = np.random.RandomState(seed) r = rng.multinomial(1, self.pi, size=(num_samples,)) x = np.zeros((num_samples, self.x_dim), dtype=float_cpu()) for k in range(self.num_comp): index = r[:, k] == 1 n_k = np.sum(index) if n_k == 0: continue x[index] = rng.multivariate_normal( self.mu[k], self.Sigma[k], size=(n_k,) ).astype(float_cpu()) return x
[docs] def get_config(self): config = { "var_floor": self.var_floor, "update_mu": self.update_mu, "update_lambda": self.update_Lambda, } base_config = super().get_config() return dict(list(base_config.items()) + list(config.items()))
[docs] def save_params(self, f): params = {"pi": self.pi, "mu": self.mu, "Lambda": self.Lambda} self._save_params_from_dict(f, params)
[docs] @classmethod def load_params(cls, f, config): param_list = ["pi", "mu", "Lambda"] params = cls._load_params_to_dict(f, config["name"], param_list) return cls( x_dim=config["x_dim"], pi=params["pi"], mu=params["mu"], Lambda=params["Lambda"], var_floor=config["var_floor"], min_N=config["min_n"], update_pi=config["update_pi"], update_mu=config["update_mu"], update_Lambda=config["update_lambda"], name=config["name"], )
[docs] @classmethod def load_from_kaldi(cls, file_path): pi = None eta1 = None eta2 = None num_comp = 0 x_dim = 0 success = False with open(file_path, "r") as f: while True: line = f.readline() if not line: break fields = line.rstrip().split() if fields[0] == "<WEIGHTS>": pi = np.array([float(v) for v in fields[2:-1]], dtype=float_cpu()) num_comp = len(pi) elif fields[0] == "<MEANS_INVCOVARS>": for k in range(num_comp): line = f.readline() fields = line.split() if x_dim == 0: x_dim = len(fields) eta1 = np.zeros((num_comp, x_dim), dtype=float_cpu()) eta2 = np.zeros( (num_comp, int((x_dim ** 2 + 3 * x_dim) / 2)), dtype=float_cpu(), ) assert len(fields) == x_dim or len(fields) == x_dim + 1 eta1[k] = [float(v) for v in fields[:x_dim]] elif fields[0] == "<INV_COVARS>": L = np.zeros((x_dim, x_dim), dtype=float_cpu()) for k in range(num_comp): L[:, :] = 0 for j in range(x_dim): line = f.readline() fields = line.split() if j < x_dim - 1: assert len(fields) == j + 1 else: assert len(fields) == x_dim + 1 L[j, : j + 1] = [float(v) for v in fields[: j + 1]] eta2[k] = -symmat2vec(L.T, diag_factor=0.5) if k == num_comp - 1: success = True assert success eta = np.hstack((eta1, eta2)) return cls(x_dim=x_dim, pi=pi, eta=eta)
def _validate_mu(self): assert self.mu.shape[0] == self.num_comp assert self.mu.shape[1] == self.x_dim def _validate_Lambda(self): assert self.Lambda.shape[0] == self.num_comp assert self.Lambda.shape[1] == self.x_dim assert self.Lambda.shape[2] == self.x_dim def _validate_eta(self): assert self.eta.shape[0] == self.num_comp assert self.eta.shape[1] == (self.x_dim ** 2 + 3 * self.x_dim) / 2
[docs] def validate(self): if self.pi is not None: self._validate_pi() if self.mu is not None and self.Lambda is not None: self._validate_mu() self._validate_Lambda() if self.eta is not None: self._validate_eta()
[docs] @staticmethod def compute_eta(mu, Lambda): x_dim = mu.shape[-1] eta_dim = int((x_dim ** 2 + 3 * x_dim) / 2) eta = np.zeros((mu.shape[0], eta_dim), dtype=float_cpu()) for k in range(mu.shape[0]): eta[k] = Normal.compute_eta(mu[k], Lambda[k]) return eta
[docs] @staticmethod def compute_std(eta): x_dim = Normal.compute_x_dim_from_eta(eta) mu = np.zeros((eta.shape[0], x_dim), dtype=float_cpu()) Lambda = np.zeros((eta.shape[0], x_dim, x_dim), dtype="float32") for k in range(eta.shape[0]): mu[k], Lambda[k] = Normal.compute_std(eta[k]) return mu, Lambda
[docs] @staticmethod def compute_A_nat(eta): A = np.zeros((eta.shape[0],), dtype=float_cpu()) for k in range(eta.shape[0]): A[k] = Normal.compute_A_nat(eta[k]) return A
[docs] @staticmethod def compute_A_std(mu, Lambda): A = np.zeros((mu.shape[0],), dtype=float_cpu()) for k in range(mu.shape[0]): A[k] = Normal.compute_A_std(mu[k], Lambda[k]) return A
def _compute_nat_params(self): self.eta = self.compute_eta(self.mu, self.Lambda) self.A = self.compute_A_nat(self.eta) def _compute_std_params(self): self.mu, self.Lambda = self.compute_std(self.eta) self._cholLambda = None self._logLambda = None self._Sigma = None
[docs] @staticmethod def compute_suff_stats(x): d = x.shape[1] u = np.zeros((x.shape[0], int(d + d * (d + 1) / 2)), dtype=float_cpu()) u[:, :d] = x k = d for i in range(d): for j in range(i, d): u[:, k] = x[:, i] * x[:, j] k += 1 return u
[docs] def plot1D(self, feat_idx=0, num_sigmas=2, num_pts=100, **kwargs): mu = self.mu[:, feat_idx] for k in range(mu.shape[0]): C = invert_pdmat(self.Lambda[k], return_inv=True)[-1][feat_idx, feat_idx] plot_gaussian_1D(mu[k], C, num_sigmas, num_pts, **kwargs)
[docs] def plot2D(self, feat_idx=[0, 1], num_sigmas=2, num_pts=100, **kwargs): mu = self.mu[:, feat_idx] j, i = np.meshgrid(feat_idx, feat_idx) for k in range(mu.shape[0]): C_k = invert_pdmat(self.Lambda[k], return_inv=True)[-1][i, j] plot_gaussian_ellipsoid_2D(mu[k], C_k, num_sigmas, num_pts, **kwargs)
[docs] def plot3D(self, feat_idx=[0, 1], num_sigmas=2, num_pts=100, **kwargs): mu = self.mu[:, feat_idx] j, i = np.meshgrid(feat_idx, feat_idx) for k in range(mu.shape[0]): C_k = invert_pdmat(self.Lambda[k], return_inv=True)[-1][i, j] plot_gaussian_3D(mu[k], C_k, num_sigmas, num_pts, **kwargs)
[docs] def plot3D_ellipsoid(self, feat_idx=[0, 1, 2], num_sigmas=2, num_pts=100, **kwargs): mu = self.mu[:, feat_idx] j, i = np.meshgrid(feat_idx, feat_idx) for k in range(mu.shape[0]): C_k = invert_pdmat(self.Lambda[k], return_inv=True)[-1][i, j] plot_gaussian_ellipsoid_3D(mu[k], C_k, num_sigmas, num_pts, **kwargs)