Source code for hyperion.pdfs.plda.plda_base

"""
 Copyright 2018 Johns Hopkins University  (Author: Jesus Villalba)
 Apache 2.0  (http://www.apache.org/licenses/LICENSE-2.0)
"""

import numpy as np

from abc import ABCMeta, abstractmethod

from ...hyp_defs import float_cpu
from ..core.pdf import PDF
from ...transforms import LNorm


[docs]class PLDABase(PDF): __metaclass__ = ABCMeta
[docs] def __init__(self, y_dim=None, mu=None, update_mu=True, **kwargs): super(PLDABase, self).__init__(**kwargs) self.mu = mu self.y_dim = y_dim self.update_mu = update_mu if mu is not None: self.x_dim = mu.shape[0]
[docs] @abstractmethod def initialize(self, D): pass
[docs] @abstractmethod def compute_py_g_x(self, D): pass
[docs] def fit( self, x, class_ids=None, ptheta=None, sample_weight=None, x_val=None, class_ids_val=None, ptheta_val=None, sample_weight_val=None, epochs=20, ml_md="ml+md", md_epochs=None, ): use_ml = False if ml_md == "md" else True use_md = False if ml_md == "ml" else True assert not (class_ids is None and ptheta is None) if class_ids is None: D = self.compute_stats_soft(x, ptheta) else: D = self.compute_stats_hard(x, class_ids) if x_val is not None: assert not (class_ids_val is None and ptheta_val is None) if class_ids_val is None: D_val = self.compute_stats_soft(x_val, ptheta_val) else: D_val = self.compute_stats_hard(x_val, class_ids_val) if not self.is_init: self.initialize(D) elbo = np.zeros((epochs,), dtype=float_cpu()) elbo_val = np.zeros((epochs,), dtype=float_cpu()) for epoch in range(epochs): stats = self.Estep(D) elbo[epoch] = self.elbo(stats) if x_val is not None: stats_val = self.Estep(D_val) elbo_val[epoch] = self.elbo(stats_val) if use_ml: self.MstepML(stats) if use_md and (md_epochs is None or epoch in md_epochs): self.MstepMD(stats) elbo_norm = elbo / np.sum(D[0]) if x_val is None: return elbo, elbo_norm else: elbo_val_norm = elbo_val / np.sum(D_val[0]) return elbo, elbo_norm, elbo_val, elbo_val_norm
[docs] @abstractmethod def Estep(self, x): pass
[docs] @abstractmethod def MstepML(self, x): pass
[docs] @abstractmethod def MstepMD(self, x): pass
[docs] @abstractmethod def llr_1vs1(self, x1, x2): pass
[docs] @abstractmethod def llr_NvsM_book(self, D1, D2): pass
[docs] def fit_adapt_weighted_avg_model( self, x, class_ids=None, ptheta=None, sample_weight=None, x_val=None, class_ids_val=None, ptheta_val=None, sample_weight_val=None, epochs=20, ml_md="ml+md", md_epochs=None, plda0=None, w_mu=1, w_B=0.5, w_W=0.5, ): assert self.is_init use_ml = False if ml_md == "md" else True use_md = False if ml_md == "ml" else True assert not (class_ids is None and ptheta is None) if class_ids is None: D = self.compute_stats_soft(x, ptheta) else: D = self.compute_stats_hard(x, class_ids) if x_val is not None: assert not (class_ids_val is None and ptheta_val is None) if class_ids_val is None: D_val = self.compute_stats_soft(x_val, ptheta_val) else: D_val = self.compute_stats_hard(x_val, class_ids_val) elbo = np.zeros((epochs,), dtype=float_cpu()) elbo_val = np.zeros((epochs,), dtype=float_cpu()) for epoch in range(epochs): stats = self.Estep(D) elbo[epoch] = self.elbo(stats) if x_val is not None: stats_val = self.Estep(D_val) elbo_val[epoch] = self.elbo(stats_val) if use_ml: self.MstepML(stats) if use_md and (md_epochs is None or epoch in md_epochs): self.MstepMD(stats) self.weighted_avg_model(plda0, w_mu, w_B, w_W) elbo_norm = elbo / np.sum(D[0]) if x_val is None: return elbo, elbo_norm else: elbo_val_norm = elbo_val / np.sum(D_val[0]) return elbo, elbo_norm, elbo_val, elbo_val_norm
[docs] def fit_adapt( self, x, class_ids=None, ptheta=None, sample_weight=None, x0=None, class_ids0=None, ptheta0=None, sample_weight0=None, x_val=None, class_ids_val=None, ptheta_val=None, sample_weight_val=None, epochs=20, ml_md="ml+md", md_epochs=None, ): assert self.is_init use_ml = False if ml_md == "md" else True use_md = False if ml_md == "ml" else True assert not (class_ids is None and ptheta is None) if class_ids is None: D = self.compute_stats_soft(x, ptheta) else: D = self.compute_stats_hard(x, class_ids) if x0 is not None: assert not (class_ids0 is None and ptheta0 is None) if class_ids0 is None: D0 = self.compute_stats_soft(x0, ptheta0) else: D0 = self.compute_stats_hard(x0, class_ids0) if x_val is not None: assert not (class_ids_val is None and ptheta_val is None) if class_ids_val is None: D_val = self.compute_stats_soft(x_val, ptheta_val) else: D_val = self.compute_stats_hard(x_val, class_ids_val) elbo = np.zeros((epochs,), dtype=float_cpu()) elbo_val = np.zeros((epochs,), dtype=float_cpu()) for epoch in range(epochs): stats = self.Estep(D) stats0 = self.Estep(D0) elbo[epoch] = self.elbo(stats) if x_val is not None: stats_val = self.Estep(D_val) elbo_val[epoch] = self.elbo(stats_val) if use_ml: self.MstepML(stats) if use_md and (md_epochs is None or epoch in md_epochs): self.MstepMD(stats) elbo_norm = elbo / np.sum(D[0]) if x_val is None: return elbo, elbo_norm else: elbo_val_norm = elbo_val / np.sum(D_val[0]) return elbo, elbo_norm, elbo_val, elbo_val_norm
[docs] @staticmethod def compute_stats_soft(x, p_theta, sample_weight=None, scal_factor=None): if sample_weight is not None: p_theta = sample_weight[:, None] * p_theta if scal_factor is not None: p_theta *= scal_factor N = np.sum(p_theta, axis=0) F = np.dot(p_theta.T, x) wx = np.sum(p_theta, axis=1, keepdims=True) * x S = np.dot(x.T, wx) return N, F, S
[docs] @staticmethod def compute_stats_hard(x, class_ids, sample_weight=None, scale_factor=None): x_dim = x.shape[1] num_classes = np.max(class_ids) + 1 N = np.zeros((num_classes,), dtype=float_cpu()) F = np.zeros((num_classes, x_dim), dtype=float_cpu()) if sample_weight is not None: wx = sample_weight[:, None] * x else: wx = x for i in range(num_classes): idx = class_ids == i if sample_weight is None: N[i] = np.sum(idx).astype(float_cpu()) F[i] = np.sum(x[idx], axis=0) else: N[i] = np.sum(sample_weight[idx]) F[i] = np.sum(wx[idx], axis=0) S = np.dot(x.T, wx) if scale_factor is not None: N *= scale_factor F *= scale_factor S *= scale_factor return N, F, S
[docs] @staticmethod def compute_stats_hard_v0(x, class_ids, sample_weight=None, scal_factor=None): x_dim = x.shape[1] num_classes = np.max(class_ids) + 1 p_theta = np.zeros((x.shape[0], num_classes), dtype=float_cpu()) p_theta[np.arange(x.shape[0]), class_ids] = 1 return PLDABase.compute_stats_soft(x, p_theta, sample_weight, scal_factor)
[docs] @staticmethod def center_stats(D, mu): N, F, S = D Fc = F - np.outer(N, mu) Fmu = np.outer(np.sum(F, axis=0), mu) Sc = S - Fmu - Fmu.T + np.sum(N) * np.outer(mu, mu) return N, Fc, Sc
[docs] def llr_NvsM(self, x1, x2, ids1=None, ids2=None, method="vavg-lnorm"): if method == "savg": return self.llr_NvsM_savg(x1, ids1, x2, ids2) D1 = x1 if ids1 is None else self.compute_stats_hard(x1, class_ids=ids1) D2 = x2 if ids2 is None else self.compute_stats_hard(x2, class_ids=ids2) if method == "book": return self.llr_NvsM_book(D1, D2) if method == "vavg": return self.llr_NvsM_vavg(D1, D2, do_lnorm=False) if method == "vavg-lnorm": return self.llr_NvsM_vavg(D1, D2, do_lnorm=True)
[docs] def llr_NvsM_vavg(self, D1, D2, do_lnorm=True): x1 = D1[1] / np.expand_dims(D1[0], axis=-1) x2 = D2[1] / np.expand_dims(D2[0], axis=-1) if do_lnorm: lnorm = LNorm() x1 = lnorm.predict(x1) x2 = lnorm.predict(x2) return self.llr_1vs1(x1, x2)
[docs] def llr_NvsM_savg(self, x1, ids1, x2, ids2): scores_1vs1 = self.llr_1vs1(x1, x2) N, F, _ = self.compute_stats_hard(scores_1vs1, ids1) scores_Nvs1 = F / N[:, None] N, F, _ = self.compute_stats_hard(scores_Nvs1.T, ids2) scores = F.T / N return scores
[docs] def llr_Nvs1(self, x1, x2, ids1=None, method="vavg-lnorm"): if method == "savg": return self.llr_Nvs1_savg(x1, ids1, x2) D1 = x1 if ids1 is None else self.compute_stats_hard(x1, class_ids=ids1) if method == "book": D2 = self.compute_stats_hard(x2, np.arange(x2.shape[0])) return self.llr_NvsM_book(D1, D2) if method == "vavg": return self.llr_Nvs1_vavg(D1, x2, do_lnorm=False) if method == "vavg-lnorm": return self.llr_Nvs1_vavg(D1, x2, do_lnorm=True)
[docs] def llr_Nvs1_vavg(self, D1, x2, do_lnorm=True): x1 = D1[1] / np.expand_dims(D1[0], axis=-1) if do_lnorm: lnorm = LNorm() x1 = lnorm.predict(x1) x2 = lnorm.predict(x2) return self.llr_1vs1(x1, x2)
[docs] def llr_Nvs1_savg(self, x1, ids1, x2): scores_1vs1 = self.llr_1vs1(x1, x2) N, F, _ = self.compute_stats_hard(scores_1vs1, ids1) scores = F / N[:, None] return scores
[docs] @abstractmethod def sample(self, num_classes, num_samples_per_class, rng=None, seed=1024): pass
[docs] def get_config(self): config = {"y_dim": self.y_dim, "update_mu": self.update_mu} base_config = super(PLDABase, self).get_config() return dict(list(base_config.items()) + list(config.items()))
[docs] def weigthed_avg_params(self, mu, w_mu): self.mu = w_mu * mu + (1 - w_mu) * self.mu
[docs] @abstractmethod def weigthed_avg_model(self, plda): pass