"""
Copyright 2020 Johns Hopkins University (Author: Jesus Villalba)
Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
"""
import os.path as path
import logging
import copy
import numpy as np
import scipy.sparse as sparse
# import h5py
from ..hyp_defs import float_cpu
from .list_utils import *
from .trial_ndx import TrialNdx
from .trial_key import TrialKey
from .sparse_trial_key import SparseTrialKey
from .trial_scores import TrialScores
[docs]class SparseTrialScores(TrialScores):
"""Contains the scores for the speaker recognition trials.
Bosaris compatible Scores.
Attributes:
model_set: List of model names.
seg_set: List of test segment names.
scores: Matrix with the scores (num_models x num_segments).
score_mask: Boolean matrix with the trials with valid scores to True (num_models x num_segments).
"""
[docs] def __init__(self, model_set=None, seg_set=None, scores=None, score_mask=None):
super(SparseTrialScores, self).__init__(model_set, seg_set, scores, score_mask)
[docs] def save_h5(self, file_path):
raise NotImplementedError()
[docs] def save_txt(self, file_path):
"""Saves object to txt file.
Args:
file_path: File to write the list.
"""
self.score_mask.eliminate_zeros()
score_mask = self.score_mask.tocoo()
with open(file_path, "w") as f:
for r, c in zip(score_mask.row, score_mask.col):
f.write(
"%s %s %f\n"
% (self.model_set[r], self.seg_set[c], self.scores[r, c])
)
[docs] @classmethod
def load_h5(cls, file_path):
raise NotImplementedError()
[docs] @classmethod
def load_txt(cls, file_path):
"""Loads object from h5 file
Args:
file_path: File to read the list.
Returns:
SparseTrialScores object.
"""
with open(file_path, "r") as f:
fields = [line.split() for line in f]
models = [i[0] for i in fields]
segments = [i[1] for i in fields]
scores_v = np.array([i[2] for i in fields])
model_set, _, model_idx = np.unique(
models, return_index=True, return_inverse=True
)
seg_set, _, seg_idx = np.unique(
segments, return_index=True, return_inverse=True
)
scores = sparse.lil_matrix((len(model_set), len(seg_set)), dtype=float_cpu())
score_mask = sparse.lil_matrix(scores.shape, dtype="bool")
for item in zip(model_idx, seg_idx, scores_v):
score_mask[item[0], item[1]] = True
scores[item[0], item[1]] = item[2]
return cls(model_set, seg_set, scores.tocsr(), score_mask.tocsr())
[docs] @classmethod
def merge(cls, scr_list):
raise NotImplementedError()
[docs] def split(self, model_idx, num_model_parts, seg_idx, num_seg_parts):
"""Splits the TrialScores into num_model_parts x num_seg_parts and returns part
(model_idx, seg_idx).
Args:
model_idx: Model index of the part to return from 1 to num_model_parts.
num_model_parts: Number of parts to split the model list.
seg_idx: Segment index of the part to return from 1 to num_model_parts.
num_seg_parts: Number of parts to split the test segment list.
Returns:
Subpart of the TrialScores
"""
model_set, model_idx1 = split_list(self.model_set, model_idx, num_model_parts)
seg_set, seg_idx1 = split_list(self.seg_set, seg_idx, num_seg_parts)
ix = np.ix_(model_idx1, seg_idx1)
scores = self.scores[ix]
score_mask = self.score_mask[ix]
return SparseTrialScores(model_set, seg_set, scores, score_mask)
[docs] def validate(self):
"""Validates the attributes of the TrialKey object."""
self.model_set = list2ndarray(self.model_set)
self.seg_set = list2ndarray(self.seg_set)
assert len(np.unique(self.model_set)) == len(self.model_set)
assert len(np.unique(self.seg_set)) == len(self.seg_set)
if self.scores is None:
self.scores = sparse.csr_matrix(
(len(model_set), len(seg_set)), dtype=float_cpu()
)
else:
assert self.scores.shape == (len(self.model_set), len(self.seg_set))
assert np.all(np.isfinite(self.scores.data))
if self.score_mask is None:
self.score_mask = sparse.csr_matrix(
np.ones((len(self.model_set), len(self.seg_set)), dtype="bool")
)
else:
assert self.score_mask.shape == (len(self.model_set), len(self.seg_set))
[docs] def filter(self, model_set, seg_set, keep=True, raise_missing=True):
"""Removes elements from TrialScores object.
Args:
model_set: List of models to keep or remove.
seg_set: List of test segments to keep or remove.
keep: If True, we keep the elements in model_set/seg_set,
if False, we remove the elements in model_set/seg_set.
raise_missing: Raises exception if there are elements in model_set or
seg_set that are not in the object.
Returns:
Filtered TrialScores object.
"""
if not (keep):
model_set = np.setdiff1d(self.model_set, model_set)
seg_set = np.setdiff1d(self.model_set, seg_set)
f_mod, mod_idx = ismember(model_set, self.model_set)
f_seg, seg_idx = ismember(seg_set, self.seg_set)
if not (np.all(f_mod) and np.all(f_seg)):
for i in (f_mod == 0).nonzero()[0]:
logging.info("model %s not found" % model_set[i])
for i in (f_seg == 0).nonzero()[0]:
logging.info("segment %s not found" % seg_set[i])
if raise_missing:
raise Exception("some scores were not computed")
# model_set = self.model_set[mod_idx]
# set_set = self.seg_set[seg_idx]
# ix = np.ix_(mod_idx, seg_idx)
# logging.info('hola1')
# new_src = [[self.scores[r,c], i, j] for i,r in enumerate(mod_idx) for j,c in enumerate(seg_idx) if self.score_mask[r,c]]
# logging.info('hola2')
# new_data = np.array([r[0] for r in new_src], dtype=float_cpu())
# new_row = np.array([r[1] for r in new_src], dtype=np.int)
# new_col = np.array([r[2] for r in new_src], dtype=np.int)
# logging.info('hola3')
# shape = (len(model_set), len(seg_set))
# scores = sparse.coo_matrix((new_data, (new_row, new_col)), shape=shape).tocsr()
# score_mask = sparse.coo_matrix((np.ones(new_data.shape, dtype=np.bool), (new_row, new_col)), shape=shape).tocsr()
num_mod = len(model_set)
num_seg = len(seg_set)
shape = (num_mod, num_seg)
scores = self.scores.tocoo()
new_data = scores.data
new_row = scores.row.copy()
for i, r in enumerate(mod_idx):
if f_mod[i] and i != r:
idx = scores.row == r
new_row[idx] = i
new_col = scores.col.copy()
for j, c in enumerate(seg_idx):
if f_seg[j] and j != c:
idx = scores.col == c
new_col[idx] = j
idx = np.logical_and(new_row < num_mod, new_col < num_seg)
if not np.all(idx):
new_data = new_data[idx]
new_row = new_row[idx]
new_col = new_col[idx]
scores = sparse.coo_matrix((new_data, (new_row, new_col)), shape=shape).tocsr()
score_mask = self.score_mask.tocoo()
new_data = score_mask.data
new_row = score_mask.row.copy()
for i, r in enumerate(mod_idx):
if f_mod[i] and i != r:
idx = score_mask.row == r
new_row[idx] = i
new_col = score_mask.col.copy()
for j, c in enumerate(seg_idx):
if f_seg[j] and j != c:
idx = score_mask.col == c
new_col[idx] = j
idx = np.logical_and(new_row < num_mod, new_col < num_seg)
if not np.all(idx):
new_data = new_data[idx]
new_row = new_row[idx]
new_col = new_col[idx]
score_mask = sparse.coo_matrix(
(new_data, (new_row, new_col)), shape=shape
).tocsr()
return SparseTrialScores(model_set, seg_set, scores, score_mask)
[docs] def align_with_ndx(self, ndx, raise_missing=True):
"""Aligns scores, model_set and seg_set with TrialNdx or TrialKey.
Args:
ndx: TrialNdx or TrialKey object.
raise_missing: Raises exception if there are trials in ndx that are not
in the score object.
Returns:
Aligned TrialScores object.
"""
scr = self.filter(
ndx.model_set, ndx.seg_set, keep=True, raise_missing=raise_missing
)
if isinstance(ndx, TrialNdx):
mask = sparse.csr_matrix(ndx.trial_mask)
elif isinstance(ndx, SparseTrialKey):
mask = ndx.tar.maximum(ndx.non)
elif isinstance(ndx, TrialKey):
mask = sparse.csr_matrix(np.logical_or(ndx.tar, ndx.non))
else:
raise Exception()
mask.eliminate_zeros()
scr.score_mask = mask.multiply(scr.score_mask)
mask = mask.tocoo()
missing_scores = False
for d, r, c in zip(mask.data, mask.row, mask.col):
if not scr.score_mask[r, c]:
missing_scores = True
logging.info(
"missing-scores for %s %s" % (scr.model_set[r], scr.seg_set[c])
)
if missing_scores and raise_missing:
raise Exception("some scores were not computed")
return scr
[docs] def get_tar_non(self, key):
"""Returns target and non target scores.
Args:
key: TrialKey object.
Returns:
Numpy array with target scores.
Numpy array with non-target scores.
"""
scr = self.align_with_ndx(key)
tar_mask = scr.score_mask.multiply(key.tar)
tar = np.array(scr.scores[tar_mask])[0]
non_mask = scr.score_mask.multiply(key.non)
non = np.array(scr.scores[non_mask])[0]
return tar, non
[docs] @classmethod
def from_trial_scores(cls, scr):
scores = sparse.csr_matrix(scr.scores)
score_mask = sparse.csr_matrix(scr.score_mask)
scores.eliminate_zeros()
score_mask.eliminate_zeros()
return cls(scr.model_set, scr.seg_set, scores, score_mask)
[docs] def set_missing_to_value(self, ndx, val):
"""Aligns the scores with a TrialNdx and sets the trials with missing
scores to the same value.
Args:
ndx: TrialNdx or TrialKey object.
val: Value for the missing scores.
Returns:
Aligned SparseTrialScores object.
"""
scr = self.align_with_ndx(ndx, raise_missing=False)
if isinstance(ndx, TrialNdx):
mask = sparse.csr_matrix(ndx.trial_mask)
elif isinstance(ndx, SparseTrialKey):
mask = ndx.tar.maximum(ndx.non)
elif isinstance(ndx, TrialKey):
mask = sparse.csr_matrix(np.logical_or(ndx.tar, ndx.non))
else:
raise Exception()
mask.eliminate_zeros()
mask_coo = mask.tocoo()
for r, c in zip(mask_coo.row, mask_coo.col):
if not scr.score_mask[r, c]:
scr.scores[r, c] = val
scr.score_mask = mask
return scr
[docs] def __eq__(self, other):
"""Equal operator"""
eq = self.model_set.shape == other.model_set.shape
eq = eq and np.all(self.model_set == other.model_set)
eq = eq and (self.seg_set.shape == other.seg_set.shape)
eq = eq and np.all(self.seg_set == other.seg_set)
eq = eq and np.all(np.isclose(self.scores.data, other.scores.data, atol=1e-5))
eq = eq and np.all(self.scores.indices == other.scores.indices)
eq = eq and np.all(self.score_mask.data == other.score_mask.data)
eq = eq and np.all(self.score_mask.indices == other.score_mask.indices)
return eq