Source code for hyperion.io.h5_data_reader

"""
 Copyright 2018 Johns Hopkins University  (Author: Jesus Villalba)
 Apache 2.0  (http://www.apache.org/licenses/LICENSE-2.0)

 Classes to read data from hdf5 files.
"""

import sys
import time
import numpy as np
import h5py
import multiprocessing

from ..hyp_defs import float_cpu
from ..utils.list_utils import split_list, split_list_group_by_key
from ..utils.scp_list import SCPList
from ..utils.kaldi_matrix import KaldiMatrix, KaldiCompressedMatrix
from ..utils.kaldi_io_funcs import is_token
from .data_reader import SequentialDataReader, RandomAccessDataReader


[docs]def _read_h5_data(dset, row_offset=0, num_rows=0, transform=None): """Auxiliary function to read the feature matrix from hdf5 dataset. It decompresses the data if it was compressed. Args: dset: hdf5 dataset correspoding to a feature matrix/vector. row_offset: First row to read from each feature matrix. num_rows: Number of rows to read from the feature matrix. If 0 it reads all the rows. transform: TransformList object, applies a transformation to the features after reading them from disk. Returns: Numpy array with feature matrix/vector. """ if row_offset > 0: if num_rows == 0: data = dset[row_offset:] else: data = dset[row_offset : row_offset + num_rows] elif num_rows > 0: data = dset[:num_rows] else: data = dset if "data_format" in dset.attrs: if not isinstance(data, np.ndarray): data = np.asarray(data) data = KaldiCompressedMatrix.build_from_data_attrs( data, dset.attrs ).to_ndarray() assert num_rows == 0 or data.shape[0] == num_rows data = np.asarray(data, dtype=float_cpu()) if transform is not None: data = transform.predict(data) return data
[docs]class SequentialH5DataReader(SequentialDataReader): """Abstract base class to read hdf5 feature files in sequential order. Attributes: file_path: ark or scp file to read. transform: TransformList object, applies a transformation to the features after reading them from disk. part_idx: It splits the input into num_parts and writes only part part_idx, where part_idx=1,...,num_parts. num_parts: Number of parts to split the input data. split_by_key: If True, all the elements with the same key go to the same part. """
[docs] def __init__(self, file_path, **kwargs): super().__init__(file_path, **kwargs) self.f = None self.cur_file = None self.cur_item = 0
[docs] def close(self): """Closes current hdf5 file.""" if self.f is not None: self.f.close() self.f = None
[docs] def _open_archive(self, file_path): """Opens the hdf5 file where the next matrix/vector is if it is not open. If there was another hdf5 file open, it closes it. """ if self.f is None or file_path != self.cur_file: self.close() self.cur_file = file_path self.f = h5py.File(file_path, "r")
[docs] def read_num_rows(self, num_records=0, assert_same_dim=True): """Reads the number of rows in the feature matrices of the dataset. Args: num_records: How many matrices shapes to read, if num_records=0 it reads al the matrices in the dataset. assert_same_dim: If True, it raise exception in not all the matrices have the same number of columns. Returns: List of num_records recording names. Integer numpy array with num_records number of rows. """ keys, shapes = self.read_shapes(num_records, assert_same_dim) num_rows = np.array([s[0] if len(s) == 2 else 1 for s in shapes], dtype=int) return keys, num_rows
[docs] def read_dims(self, num_records=0, assert_same_dim=True): """Reads the number of columns in the feature matrices of the dataset. Args: num_records: How many matrices shapes to read, if num_records=0 it reads al the matrices in the dataset. assert_same_dim: If True, it raise exception in not all the matrices have the same number of columns. Returns: List of num_records recording names. Integer numpy array with num_records number of columns. """ keys, shapes = self.read_shapes(num_records, False) dims = np.array([s[-1] for s in shapes], dtype=np.int32) if assert_same_dim and len(dims) > 0: assert np.all(dims == dims[0]) return keys, dims
[docs]class SequentialH5FileDataReader(SequentialH5DataReader): """Class to read feature matrices/vectors in sequential order from a single hdf5 file. Attributes: file_path: Ark file to read. transform: TransformList object, applies a transformation to the features after reading them from disk. part_idx: It splits the input into num_parts and writes only part part_idx, where part_idx=1,...,num_parts. num_parts: Number of parts to split the input data. split_by_key: If True, all the elements with the same key go to the same part. """
[docs] def __init__(self, file_path, **kwargs): super().__init__(file_path, permissive=False, **kwargs) self._open_archive(self.file_path) self._keys = list(self.f.keys()) if self.num_parts > 1: if self.split_by_key: self._keys, _ = split_list_group_by_key( self._keys, self.part_idx, self.num_parts ) else: self._keys, _ = split_list(self._keys, self.part_idx, self.num_parts)
@property def keys(self): return self._keys
[docs] def reset(self): """Puts the file pointer back to the begining of the file""" if self.f is not None: self.cur_item = 0
[docs] def eof(self): """Returns True when it reaches the end of the ark file.""" return self.cur_item == len(self._keys)
[docs] def read_shapes(self, num_records=0, assert_same_dim=True): """Reads the shapes in the feature matrices of the dataset. Args: num_records: How many matrices shapes to read, if num_records=0 it reads al the matrices in the dataset. assert_same_dim: If True, it raise exception in not all the matrices have the same number of columns. Returns: List of num_records recording names. List of tuples with num_records shapes. """ if num_records == 0: num_records = len(self._keys) - self.cur_item keys = [] shapes = [] for i in range(num_records): if self.eof(): break key = self._keys[self.cur_item] keys.append(key) shapes.append(self.f[key].shape) self.cur_item += 1 if assert_same_dim and len(shapes) > 0: dims = np.array([s[-1] for s in shapes], dtype=np.int32) assert np.all(dims == dims[0]) return keys, shapes
[docs] def read(self, num_records=0, squeeze=False, row_offset=0, num_rows=0): """Reads next num_records feature matrices/vectors. Args: num_records: Number of feature matrices to read. squeeze: If True, it converts the list of matrices/vectors to 3D/2D numpy array. All matrices need to have same number of rows. offset: List of integers or numpy array of with the first row to read from each feature matrix. num_rows: List of integers or numpy array of with the number of rows to read from each feature matrix. If 0 it reads all the rows. Returns: key: List of recording names. data: List of feature matrices/vectors or 3D/2D numpy array. """ if num_records == 0: num_records = len(self._keys) - self.cur_item row_offset_is_list = isinstance(row_offset, list) or isinstance( row_offset, np.ndarray ) num_rows_is_list = isinstance(num_rows, list) or isinstance( num_rows, np.ndarray ) keys = [] data = [] with self.lock: for i in range(num_records): if self.eof(): break key_i = self._keys[self.cur_item] row_offset_i = row_offset[i] if row_offset_is_list else row_offset num_rows_i = num_rows[i] if num_rows_is_list else num_rows dset_i = self.f[key_i] data_i = _read_h5_data(dset_i, row_offset_i, num_rows_i, self.transform) self.cur_item += 1 keys.append(key_i) data.append(data_i) if squeeze: data = self._squeeze(data) return keys, data
[docs]class SequentialH5ScriptDataReader(SequentialH5DataReader): """Class to read features from multiple hdf5 files where a scp file indicates which hdf5 file contains each feature matrix. Attributes: file_path: scp file to read. path_prefix: If input_spec is a scp file, it pre-appends path_prefix string to the second column of the scp file. This is useful when data is read from a different directory of that it was created. scp_sep: Separator for scp files (default ' '). transform: TransformList object, applies a transformation to the features after reading them from disk. part_idx: It splits the input into num_parts and writes only part part_idx, where part_idx=1,...,num_parts. num_parts: Number of parts to split the input data. split_by_key: If True, all the elements with the same key go to the same part. """
[docs] def __init__(self, file_path, path_prefix=None, scp_sep=" ", **kwargs): super().__init__(file_path, permissive=False, **kwargs) self.scp = SCPList.load(self.file_path, sep=scp_sep) if self.num_parts > 1: self.scp = self.scp.split( self.part_idx, self.num_parts, group_by_key=self.split_by_key ) if path_prefix is not None: self.scp.add_prefix_to_filepath(path_prefix)
@property def keys(self): return self.scp.key
[docs] def reset(self): """Closes all the open hdf5 files and puts the read pointer pointing to the first element in the scp file.""" self.close() self.cur_item = 0
[docs] def eof(self): """Returns True when all the elements in the scp have been read.""" return self.cur_item == len(self.scp)
[docs] def read_shapes(self, num_records=0, assert_same_dim=True): """Reads the shapes in the feature matrices of the dataset. Args: num_records: How many matrices shapes to read, if num_records=0 it reads al the matrices in the dataset. assert_same_dim: If True, it raise exception in not all the matrices have the same number of columns. Returns: List of num_records recording names. List of tuples with num_records shapes. """ if num_records == 0: num_records = len(self.scp) - self.cur_item keys = [] shapes = [] for i in range(num_records): if self.eof(): break key, file_path, offset, range_spec = self.scp[self.cur_item] row_offset_i, num_rows_i = self._combine_ranges(range_spec, 0, 0) self._open_archive(file_path) shape_i = self.f[key].shape shape_i = self._apply_range_to_shape(shape_i, row_offset_i, num_rows_i) keys.append(key) shapes.append(shape_i) self.cur_item += 1 if assert_same_dim: dims = np.array([s[-1] for s in shapes], dtype=np.int32) assert np.all(dims == dims[0]) return keys, shapes
[docs] def read(self, num_records=0, squeeze=False, row_offset=0, num_rows=0): """Reads next num_records feature matrices/vectors. Args: num_records: Number of feature matrices to read. squeeze: If True, it converts the list of matrices/vectors to 3D/2D numpy array. All matrices need to have same number of rows. offset: List of integers or numpy array of with the first row to read from each feature matrix. num_rows: List of integers or numpy array of with the number of rows to read from each feature matrix. If 0 it reads all the rows. Returns: key: List of recording names. data: List of feature matrices/vectors or 3D/2D numpy array. """ if num_records == 0: num_records = len(self.scp) - self.cur_item row_offset_is_list = isinstance(row_offset, list) or isinstance( row_offset, np.ndarray ) num_rows_is_list = isinstance(num_rows, list) or isinstance( num_rows, np.ndarray ) keys = [] data = [] with self.lock: for i in range(num_records): if self.eof(): break key, file_path, offset, range_spec = self.scp[self.cur_item] row_offset_i = row_offset[i] if row_offset_is_list else row_offset num_rows_i = num_rows[i] if num_rows_is_list else num_rows row_offset_i, num_rows_i = self._combine_ranges( range_spec, row_offset_i, num_rows_i ) self._open_archive(file_path) dset_i = self.f[key] data_i = _read_h5_data(dset_i, row_offset_i, num_rows_i, self.transform) self.cur_item += 1 key = keys.append(key) data.append(data_i) if squeeze: data = self._squeeze(data) return keys, data
[docs]class RandomAccessH5DataReader(RandomAccessDataReader): """Abstract base class to read hdf5 feature files in random order. Attributes: file_path: hdf5 or scp file to read. transform: TransformList object, applies a transformation to the features after reading them from disk. permissive: If True, if the data that we want to read is not in the file it returns an empty matrix, if False it raises an exception. """
[docs] def __init__(self, file_path, transform=None, permissive=False): super().__init__(file_path, transform, permissive) self.f = None
[docs] def read_num_rows(self, keys, assert_same_dim=True): """Reads the number of rows in the feature matrices of the dataset. Args: keys: List of recording names from which we want to retrieve the number of rows. assert_same_dim: If True, it raise exception in not all the matrices have the same number of columns. Returns: Integer numpy array with the number of rows for the recordings in keys. """ shapes = self.read_shapes(keys, assert_same_dim) num_rows = np.array([s[0] if len(s) == 2 else 1 for s in shapes], dtype=int) return num_rows
[docs] def read_dims(self, keys, assert_same_dim=True): """Reads the number of columns in the feature matrices of the dataset. Args: keys: List of recording names from which we want to retrieve the number of columns. assert_same_dim: If True, it raise exception in not all the matrices have the same number of columns. Returns: Integer numpy array with the number of columns for the recordings in keys """ shapes = self.read_shapes(keys, False) dims = np.array([s[-1] for s in shapes], dtype=np.int32) if assert_same_dim: assert np.all(dims == dims[0]) return dims
[docs]class RandomAccessH5FileDataReader(RandomAccessH5DataReader): """Class to read from a single hdf5 file in random order Attributes: file_path: scp file to read. transform: TransformList object, applies a transformation to the features after reading them from disk. permissive: If True, if the data that we want to read is not in the file it returns an empty matrix, if False it raises an exception. """
[docs] def __init__(self, file_path, **kwargs): super().__init__(file_path, **kwargs) self.lock = multiprocessing.Lock() self._open_archive(file_path)
[docs] def close(self): """Closes the hdf5 files.""" if self.f is not None: self.f.close() self.f = None
[docs] def _open_archive(self, file_path): """Open the hdf5 file it it is not open.""" if self.f is None: self.close() self.f = h5py.File(file_path, "r")
@property def keys(self): return list(self.f.keys())
[docs] def read_shapes(self, keys, assert_same_dim=True): """Reads the shapes in the feature matrices of the dataset. Args: keys: List of recording names from which we want to retrieve the shapes. assert_same_dim: If True, it raise exception in not all the matrices have the same number of columns. Returns: List of tuples with the shapes for the recordings in keys. """ if isinstance(keys, str): keys = [keys] shapes = [] for key in keys: if not (key in self.f): if self.permissive: shapes.append((0,)) continue else: raise Exception("Key %s not found" % key) shape_i = self.f[key].shape shapes.append(shape_i) if assert_same_dim: dims = np.array([s[-1] for s in shapes], dtype=np.int32) assert np.all(dims == dims[0]) return shapes
[docs] def read(self, keys, squeeze=False, row_offset=0, num_rows=0): """Reads the feature matrices/vectors for the recordings in keys. Args: keys: List of recording names from which we want to retrieve the feature matrices/vectors. squeeze: If True, it converts the list of matrices/vectors to 3D/2D numpy array. All matrices need to have same number of rows. offset: List of integers or numpy array of with the first row to read from each feature matrix. num_rows: List of integers or numpy array of with the number of rows to read from each feature matrix. If 0 it reads all the rows. Returns: data: List of feature matrices/vectors or 3D/2D numpy array. """ if isinstance(keys, str): keys = [keys] row_offset_is_list = isinstance(row_offset, list) or isinstance( row_offset, np.ndarray ) num_rows_is_list = isinstance(num_rows, list) or isinstance( num_rows, np.ndarray ) if row_offset_is_list: assert len(row_offset) == len(keys) if num_rows_is_list: assert len(num_rows) == len(keys) data = [] for i, key in enumerate(keys): if not (key in self.f): if self.permissive: data.append(np.array([], dtype=float_cpu())) continue else: raise Exception("Key %s not found" % key) row_offset_i = row_offset[i] if row_offset_is_list else row_offset num_rows_i = num_rows[i] if num_rows_is_list else num_rows with self.lock: dset_i = self.f[key] data_i = _read_h5_data(dset_i, row_offset_i, num_rows_i, self.transform) data.append(data_i) if squeeze: data = self._squeeze(data, self.permissive) return data
[docs]class RandomAccessH5ScriptDataReader(RandomAccessH5DataReader): """Class to read multiple hdf5 files in random order, where a scp file indicates which hdf5 file contains each feature matrix. Attributes: file_path: scp file to read. path_prefix: If input_spec is a scp file, it pre-appends path_prefix string to the second column of the scp file. This is useful when data is read from a different directory of that it was created. transform: TransformList object, applies a transformation to the features after reading them from disk. permissive: If True, if the data that we want to read is not in the file it returns an empty matrix, if False it raises an exception. scp_sep: Separator for scp files (default ' '). """
[docs] def __init__(self, file_path, path_prefix=None, scp_sep=" ", **kwargs): super().__init__(file_path, **kwargs) self.scp = SCPList.load(self.file_path, sep=scp_sep) if path_prefix is not None: self.scp.add_prefix_to_filepath(path_prefix) archives, archive_idx = np.unique(self.scp.file_path, return_inverse=True) self.archives = archives self.archive_idx = archive_idx self.f = [None] * len(self.archives) self.locks = [multiprocessing.Lock() for i in range(len(self.archives))]
[docs] def close(self): """Closes all the open hdf5 files.""" for f in self.f: if f is not None: f.close() self.f = [None] * len(self.f)
@property def keys(self): return self.scp.key
[docs] def _open_archive(self, key_idx): """Opens the hdf5 file correspoding to a given feature/matrix if it is not already open. Args: key_idx: Integer position of the feature matrix in the scp file. Returns: Python file object. """ archive_idx = self.archive_idx[key_idx] with self.locks[archive_idx]: if self.f[archive_idx] is None: self.f[archive_idx] = h5py.File(self.archives[archive_idx], "r") return self.f[archive_idx], self.locks[archive_idx]
[docs] def read_shapes(self, keys, assert_same_dim=True): """Reads the shapes in the feature matrices of the dataset. Args: keys: List of recording names from which we want to retrieve the shapes. assert_same_dim: If True, it raise exception in not all the matrices have the same number of columns. Returns: List of tuples with the shapes for the recordings in keys. """ if isinstance(keys, str): keys = [keys] # t1 = time.time() shapes = [] for key in keys: if not (key in self.scp): if self.permissive: shapes.append((0,)) continue else: raise Exception("Key %s not found" % key) index = self.scp.get_index(key) _, file_path, offset, range_spec = self.scp[index] row_offset_i, num_rows_i = self._combine_ranges(range_spec, 0, 0) f, lock = self._open_archive(index) if not (key in f): if self.permissive: shapes.append((0,)) continue else: raise Exception("Key %s not found" % key) with lock: shape_i = f[key].shape shape_i = self._apply_range_to_shape(shape_i, row_offset_i, num_rows_i) # print('%s %d %.2f' % (key,time.time()-t1, len(shapes)/len(keys)*100.)) shapes.append(shape_i) if assert_same_dim: dims = np.array([s[-1] for s in shapes], dtype=np.int32) assert np.all(dims == dims[0]) return shapes
[docs] def read(self, keys, squeeze=False, row_offset=0, num_rows=0): """Reads the feature matrices/vectors for the recordings in keys. Args: keys: List of recording names from which we want to retrieve the feature matrices/vectors. squeeze: If True, it converts the list of matrices/vectors to 3D/2D numpy array. All matrices need to have same number of rows. offset: List of integers or numpy array of with the first row to read from each feature matrix. num_rows: List of integers or numpy array of with the number of rows to read from each feature matrix. If 0 it reads all the rows. Returns: data: List of feature matrices/vectors or 3D/2D numpy array. """ if isinstance(keys, str): keys = [keys] row_offset_is_list = isinstance(row_offset, list) or isinstance( row_offset, np.ndarray ) num_rows_is_list = isinstance(num_rows, list) or isinstance( num_rows, np.ndarray ) if row_offset_is_list: assert len(row_offset) == len(keys) if num_rows_is_list: assert len(num_rows) == len(keys) data = [] for i, key in enumerate(keys): if not (key in self.scp): if self.permissive: data.append(np.array([], dtype=float_cpu())) continue else: raise Exception("Key %s not found" % key) index = self.scp.get_index(key) _, file_path, offset, range_spec = self.scp[index] row_offset_i = row_offset[i] if row_offset_is_list else row_offset num_rows_i = num_rows[i] if num_rows_is_list else num_rows row_offset_i, num_rows_i = self._combine_ranges( range_spec, row_offset_i, num_rows_i ) f, lock = self._open_archive(index) with lock: if not (key in f): if self.permissive: data.append(np.array([], dtype=float_cpu())) continue else: raise Exception("Key %s not found" % key) dset_i = f[key] data_i = _read_h5_data(dset_i, row_offset_i, num_rows_i, self.transform) data.append(data_i) if squeeze: data = self._squeeze(data, self.permissive) return data