"""
Copyright 2018 Johns Hopkins University (Author: Jesus Villalba)
Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
"""
import sys
import numpy as np
import multiprocessing as threading
from ..hyp_defs import float_cpu
from ..utils.scp_list import SCPList
from ..utils.kaldi_matrix import KaldiMatrix, KaldiCompressedMatrix
from ..utils.kaldi_io_funcs import is_token, read_token, peek, init_kaldi_input_stream
from .data_reader import SequentialDataReader, RandomAccessDataReader
[docs]class SequentialArkDataReader(SequentialDataReader):
"""Abstract base class to read Ark feature files in
sequential order.
Attributes:
file_path: ark or scp file to read.
transform: TransformList object, applies a transformation to the
features after reading them from disk.
part_idx: It splits the input into num_parts and writes only
part part_idx, where part_idx=1,...,num_parts.
num_parts: Number of parts to split the input data.
split_by_key: If True, all the elements with the same key go to the same part.
"""
[docs] def __init__(self, file_path, **kwargs):
super().__init__(file_path, **kwargs)
self.f = None
self.lock = threading.Lock()
self.cur_file = None
[docs] def close(self):
"""Closes input file."""
if self.f is not None:
self.f.close()
self.f = None
[docs] def _seek(self, offset):
"""Moves the pointer of the input file.
Args:
offset: Byte where we want to put the pointer.
"""
cur_pos = self.f.tell()
delta = offset - cur_pos
self.f.seek(delta, 1)
[docs] def _open_archive(self, file_path, offset=0):
"""Opens the current file if it is not open and moves the
file pointer to a given position.
Closes previous open Ark files.
Args:
file_path: File from which we want to read the next feature matrix.
offset: Byte position where feature matrix is in the file.
"""
if self.f is None or file_path != self.cur_file:
self.close()
self.cur_file = file_path
self.f = open(file_path, "rb")
if offset > 0:
self._seek(offset)
[docs] def read_num_rows(self, num_records=0, assert_same_dim=True):
"""Reads the number of rows in the feature matrices of the dataset.
Args:
num_records: How many matrices shapes to read, if num_records=0 it
reads all the matrices in the dataset.
assert_same_dim: If True, it raise exception in not all the matrices have
the same number of columns.
Returns:
List of num_records recording names.
Integer numpy array with num_records number of rows.
"""
keys, shapes = self.read_shapes(num_records, assert_same_dim)
num_rows = np.array([s[0] if len(s) == 2 else 1 for s in shapes], dtype=int)
return keys, num_rows
[docs] def read_dims(self, num_records=0, assert_same_dim=True):
"""Reads the number of columns in the feature matrices of the dataset.
Args:
num_records: How many matrices shapes to read, if num_records=0 it
reads al the matrices in the dataset.
assert_same_dim: If True, it raise exception in not all the matrices have
the same number of columns.
Returns:
List of num_records recording names.
Integer numpy array with num_records number of columns.
"""
keys, shapes = self.read_shapes(num_records, False)
dims = np.array([s[-1] for s in shapes], dtype=int)
if assert_same_dim and len(dims) > 0:
assert np.all(dims == dims[0])
return keys, dims
[docs]class SequentialArkFileDataReader(SequentialArkDataReader):
"""Class to read feature matrices/vectors in
sequential order from a single Ark file.
Attributes:
file_path: Ark file to read.
transform: TransformList object, applies a transformation to the
features after reading them from disk.
part_idx: It splits the input into num_parts and writes only
part part_idx, where part_idx=1,...,num_parts.
num_parts: Number of parts to split the input data.
split_by_key: If True, all the elements with the same key go to the same part.
"""
[docs] def __init__(self, file_path, **kwargs):
super(SequentialArkFileDataReader, self).__init__(
file_path, permissive=False, **kwargs
)
self._open_archive(self.file_path)
self._eof = False
self._keys = None
if self.num_parts > 1:
raise NotImplementedError(
"Dataset splitting not available for %s" % self.__class__.__name__
)
[docs] def reset(self):
"""Puts the file pointer back to the begining of the file"""
if self.f is not None:
self.f.seek(0, 0)
self._eof = False
[docs] def eof(self):
"""Returns True when it reaches the end of the ark file."""
return self._eof or self.f is None
@property
def keys(self):
if self._keys is None:
self.reset()
self._keys, _ = self.read_shapes()
self.reset()
return self._keys
[docs] def read_shapes(self, num_records=0, assert_same_dim=True):
"""Reads the shapes in the feature matrices of the dataset.
Args:
num_records: How many matrices shapes to read, if num_records=0 it
reads al the matrices in the dataset.
assert_same_dim: If True, it raise exception in not all the matrices have
the same number of columns.
Returns:
List of num_records recording names.
List of tuples with num_records shapes.
"""
keys = []
shapes = []
count = 0
binary = False
while num_records == 0 or count < num_records:
key_i = read_token(self.f, binary)
if key_i == "":
self._eof = True
break
binary = init_kaldi_input_stream(self.f)
shape_i = KaldiMatrix.read_shape(self.f, binary, sequential_mode=True)
keys.append(key_i)
shapes.append(shape_i)
count += 1
if assert_same_dim and len(shapes) > 0:
dims = np.array([s[-1] for s in shapes], dtype=int)
assert np.all(dims == dims[0])
return keys, shapes
[docs] def read(self, num_records=0, squeeze=False, row_offset=0, num_rows=0):
"""Reads next num_records feature matrices/vectors.
Args:
num_records: Number of feature matrices to read.
squeeze: If True, it converts the list of
matrices/vectors to 3D/2D numpy array.
All matrices need to have same number of rows.
offset: List of integers or numpy array of with the first row to
read from each feature matrix.
num_rows: List of integers or numpy array of with the
number of rows to read from each feature matrix.
If 0 it reads all the rows.
Returns:
key: List of recording names.
data: List of feature matrices/vectors or 3D/2D numpy array.
"""
row_offset_is_list = isinstance(row_offset, list) or isinstance(
row_offset, np.ndarray
)
num_rows_is_list = isinstance(num_rows, list) or isinstance(
num_rows, np.ndarray
)
keys = []
data = []
count = 0
binary = False
with self.lock:
while num_records == 0 or count < num_records:
key_i = read_token(self.f, binary)
if key_i == "":
self._eof = True
break
row_offset_i = row_offset[i] if row_offset_is_list else row_offset
num_rows_i = num_rows[i] if num_rows_is_list else num_rows
binary = init_kaldi_input_stream(self.f)
data_i = KaldiMatrix.read(
self.f, binary, row_offset_i, num_rows_i, sequential_mode=True
).to_ndarray()
assert num_rows_i == 0 or data_i.shape[0] == num_rows_i
if self.transform is not None:
data_i = self.transform.predict(data_i)
keys.append(key_i)
data.append(data_i)
count += 1
if squeeze:
data = self._squeeze(data)
return keys, data
[docs]class SequentialArkScriptDataReader(SequentialArkDataReader):
"""Class to read Ark feature files indexed by a scp file in
sequential order.
Attributes:
file_path: scp file to read.
path_prefix: If input_spec is a scp file, it pre-appends
path_prefix string to the second column of
the scp file. This is useful when data
is read from a different directory of that
it was created.
scp_sep: Separator for scp files (default ' ').
transform: TransformList object, applies a transformation to the
features after reading them from disk.
part_idx: It splits the input into num_parts and writes only
part part_idx, where part_idx=1,...,num_parts.
num_parts: Number of parts to split the input data.
split_by_key: If True, all the elements with the same key go to the same part.
"""
[docs] def __init__(self, file_path, path_prefix=None, scp_sep=" ", **kwargs):
super(SequentialArkScriptDataReader, self).__init__(
file_path, permissive=False, **kwargs
)
self.scp = SCPList.load(self.file_path, sep=scp_sep)
if self.num_parts > 1:
self.scp = self.scp.split(
self.part_idx, self.num_parts, group_by_key=self.split_by_key
)
if path_prefix is not None:
self.scp.add_prefix_to_filepath(path_prefix)
self.cur_item = 0
@property
def keys(self):
return self.scp.key
[docs] def reset(self):
"""Closes all the open Ark files and puts the read pointer pointing
to the first element in the scp file."""
self.close()
self.cur_item = 0
[docs] def eof(self):
"""Returns True when all the elements in the scp have been read."""
return self.cur_item == len(self.scp)
[docs] def read_shapes(self, num_records=0, assert_same_dim=True):
"""Reads the shapes in the feature matrices of the dataset.
Args:
num_records: How many matrices shapes to read, if num_records=0 it
reads al the matrices in the dataset.
assert_same_dim: If True, it raise exception in not all the matrices have
the same number of columns.
Returns:
List of num_records recording names.
List of tuples with num_records shapes.
"""
if num_records == 0:
num_records = len(self.scp) - self.cur_item
keys = []
shapes = []
for i in range(num_records):
if self.eof():
break
key, file_path, offset, range_spec = self.scp[self.cur_item]
row_offset_i, num_rows_i = self._combine_ranges(range_spec, 0, 0)
self._open_archive(file_path, offset)
binary = init_kaldi_input_stream(self.f)
shape_i = KaldiMatrix.read_shape(self.f, binary, sequential_mode=True)
shape_i = self._apply_range_to_shape(shape_i, row_offset_i, num_rows_i)
keys.append(key)
shapes.append(shape_i)
self.cur_item += 1
if assert_same_dim:
dims = np.array([s[-1] for s in shapes], dtype=int)
assert np.all(dims == dims[0])
return keys, shapes
[docs] def read(self, num_records=0, squeeze=False, row_offset=0, num_rows=0):
"""Reads next num_records feature matrices/vectors.
Args:
num_records: Number of feature matrices to read.
squeeze: If True, it converts the list of
matrices/vectors to 3D/2D numpy array.
All matrices need to have same number of rows.
offset: List of integers or numpy array of with the first row to
read from each feature matrix.
num_rows: List of integers or numpy array of with the
number of rows to read from each feature matrix.
If 0 it reads all the rows.
Returns:
key: List of recording names.
data: List of feature matrices/vectors or 3D/2D numpy array.
"""
if num_records == 0:
num_records = len(self.scp) - self.cur_item
row_offset_is_list = isinstance(row_offset, list) or isinstance(
row_offset, np.ndarray
)
num_rows_is_list = isinstance(num_rows, list) or isinstance(
num_rows, np.ndarray
)
keys = []
data = []
with self.lock:
for i in range(num_records):
if self.eof():
break
key, file_path, offset, range_spec = self.scp[self.cur_item]
row_offset_i = row_offset[i] if row_offset_is_list else row_offset
num_rows_i = num_rows[i] if num_rows_is_list else num_rows
row_offset_i, num_rows_i = self._combine_ranges(
range_spec, row_offset_i, num_rows_i
)
self._open_archive(file_path, offset)
binary = init_kaldi_input_stream(self.f)
data_i = KaldiMatrix.read(
self.f, binary, row_offset_i, num_rows_i, sequential_mode=True
).to_ndarray()
assert num_rows_i == 0 or data_i.shape[0] == num_rows_i
if self.transform is not None:
data_i = self.transform.predict(data_i)
keys.append(key)
data.append(data_i)
self.cur_item += 1
if squeeze:
data = self._squeeze(data)
return keys, data
[docs]class RandomAccessArkDataReader(RandomAccessDataReader):
"""Class to read Ark files in random order, using scp file to
index the Ark files.
Attributes:
file_path: scp file to read.
path_prefix: If input_spec is a scp file, it pre-appends
path_prefix string to the second column of
the scp file. This is useful when data
is read from a different directory of that
it was created.
transform: TransformList object, applies a transformation to the
features after reading them from disk.
permissive: If True, if the data that we want to read is not in the file
it returns an empty matrix, if False it raises an exception.
scp_sep: Separator for scp files (default ' ').
"""
[docs] def __init__(
self, file_path, path_prefix=None, transform=None, permissive=False, scp_sep=" "
):
super(RandomAccessArkDataReader, self).__init__(
file_path, transform, permissive
)
self.scp = SCPList.load(self.file_path, sep=scp_sep)
if path_prefix is not None:
self.scp.add_prefix_to_filepath(path_prefix)
archives, archive_idx = np.unique(self.scp.file_path, return_inverse=True)
self.archives = archives
self.archive_idx = archive_idx
self.f = [None] * len(self.archives)
self.locks = [threading.Lock() for i in range(len(self.archives))]
@property
def keys(self):
return self.scp.key
[docs] def close(self):
"""Closes all the open Ark files."""
for f in self.f:
if f is not None:
f.close()
self.f = [None] * len(self.f)
[docs] def _open_archive(self, key_idx, offset=0):
"""Opens the Ark file correspoding to a given feature/matrix
if it is not already open and moves the file pointer to the
point where we can read that feature matrix.
If the file was already open, it only moves the file pointer.
Args:
key_idx: Integer position of the feature matrix in the scp file.
offset: Byte where we can find the feature matrix in the Ark file.
Returns:
Python file object.
threading.Lock object corresponding to the file
"""
archive_idx = self.archive_idx[key_idx]
with self.locks[archive_idx]:
if self.f[archive_idx] is None:
self.f[archive_idx] = open(self.archives[archive_idx], "rb")
f = self.f[archive_idx]
f.seek(offset, 0)
return f, self.locks[archive_idx]
[docs] def read_num_rows(self, keys, assert_same_dim=True):
"""Reads the number of rows in the feature matrices of the dataset.
Args:
keys: List of recording names from which we want to retrieve the
number of rows.
assert_same_dim: If True, it raise exception in not all the matrices have
the same number of columns.
Returns:
Integer numpy array with the number of rows for the recordings in keys.
"""
shapes = self.read_shapes(keys, assert_same_dim)
num_rows = np.array([s[0] if len(s) == 2 else 1 for s in shapes], dtype=np.int)
return num_rows
[docs] def read_dims(self, keys, assert_same_dim=True):
"""Reads the number of columns in the feature matrices of the dataset.
Args:
keys: List of recording names from which we want to retrieve the
number of columns.
assert_same_dim: If True, it raise exception in not all the matrices have
the same number of columns.
Returns:
Integer numpy array with the number of columns for the recordings in keys
"""
shapes = self.read_shapes(keys, False)
dims = np.array([s[-1] for s in shapes], dtype=np.int)
if assert_same_dim:
assert np.all(dims == dims[0])
return dims
[docs] def read_shapes(self, keys, assert_same_dim=True):
"""Reads the shapes in the feature matrices of the dataset.
Args:
keys: List of recording names from which we want to retrieve the
shapes.
assert_same_dim: If True, it raise exception in not all the matrices have
the same number of columns.
Returns:
List of tuples with the shapes for the recordings in keys.
"""
if isinstance(keys, str):
keys = [keys]
shapes = []
for key in keys:
if not (key in self.scp):
if self.permissive:
shapes.append((0,))
continue
else:
raise Exception("Key %s not found" % key)
index = self.scp.get_index(key)
_, file_path, offset, range_spec = self.scp[index]
row_offset_i, num_rows_i = self._combine_ranges(range_spec, 0, 0)
f, lock = self._open_archive(index)
with lock:
f.seek(offset, 0)
binary = init_kaldi_input_stream(f)
shape_i = KaldiMatrix.read_shape(f, binary, sequential_mode=False)
shape_i = self._apply_range_to_shape(shape_i, row_offset_i, num_rows_i)
shapes.append(shape_i)
if assert_same_dim:
dims = np.array([s[-1] for s in shapes], dtype=np.int)
assert np.all(dims == dims[0])
return shapes
[docs] def read(self, keys, squeeze=False, row_offset=0, num_rows=0):
"""Reads the feature matrices/vectors for the recordings in keys.
Args:
keys: List of recording names from which we want to retrieve the
feature matrices/vectors.
squeeze: If True, it converts the list of
matrices/vectors to 3D/2D numpy array.
All matrices need to have same number of rows.
offset: List of integers or numpy array of with the first row to
read from each feature matrix.
num_rows: List of integers or numpy array of with the
number of rows to read from each feature matrix.
If 0 it reads all the rows.
Returns:
data: List of feature matrices/vectors or 3D/2D numpy array.
"""
if isinstance(keys, str):
keys = [keys]
row_offset_is_list = isinstance(row_offset, list) or isinstance(
row_offset, np.ndarray
)
num_rows_is_list = isinstance(num_rows, list) or isinstance(
num_rows, np.ndarray
)
if row_offset_is_list:
assert len(row_offset) == len(keys)
if num_rows_is_list:
assert len(num_rows) == len(keys)
data = []
for i, key in enumerate(keys):
if not (key in self.scp):
if self.permissive:
data.append(np.array([], dtype=float_cpu()))
continue
else:
raise Exception("Key %s not found" % key)
index = self.scp.get_index(key)
_, file_path, offset, range_spec = self.scp[index]
row_offset_i = row_offset[i] if row_offset_is_list else row_offset
num_rows_i = num_rows[i] if num_rows_is_list else num_rows
row_offset_i, num_rows_i = self._combine_ranges(
range_spec, row_offset_i, num_rows_i
)
f, lock = self._open_archive(index)
with lock:
f.seek(offset, 0)
binary = init_kaldi_input_stream(f)
data_i = KaldiMatrix.read(
f, binary, row_offset_i, num_rows_i, sequential_mode=False
).to_ndarray()
assert num_rows_i == 0 or data_i.shape[0] == num_rows_i
if self.transform is not None:
data_i = self.transform.predict(data_i)
data.append(data_i)
if squeeze:
data = self._squeeze(data, self.permissive)
return data