Source code for hyperion.utils.vad_utils

"""
 Copyright 2020 Johns Hopkins University  (Author: Jesus Villalba)
 Apache 2.0  (http://www.apache.org/licenses/LICENSE-2.0)
"""
import numpy as np

from ..hyp_defs import float_cpu


def _assert_sorted(t):
    delta = np.diff(t[:, 0])
    assert np.all(delta >= 0), "time-stamps must be sorted"


def _assert_pos_dur(t):
    delta = t[:, 1] - t[:, 0]
    assert np.all(delta >= 0), "segments must have positve duration"


[docs]def merge_vad_timestamps(in_timestamps, tol=0.001): """Merges vad timestamps that are contiguous Args: in_timestamps: original time-stamps in start-time, end-time format tol: tolerance, segments separted less than tol will be merged Returns: Merged timestamps """ # if empty return the same if in_timestamps.shape[0] == 0: return in_timestamps # assert segments are shorted by start time, and positive dur _assert_sorted(in_timestamps) _assert_pos_dur(in_timestamps) # assert segments are shorted by start time delta = np.diff(in_timestamps[:, 0]) assert np.all(delta >= 0), "time-stamps must be sorted" out_timestamps = np.zeros_like(in_timestamps) t_start = in_timestamps[0, 0] t_end = in_timestamps[0, 1] j = 0 for i in range(1, in_timestamps.shape[0]): t_start_i = in_timestamps[i, 0] t_end_i = in_timestamps[i, 1] if t_end >= t_start_i - tol: # we merge with previous if t_end_i > t_end: # this should be true always except odd cases t_end = t_end_i else: # new segment found # we write current segment to out_timestamps # and start new segment out_timestamps[j, 0] = t_start out_timestamps[j, 1] = t_end t_start = t_start_i t_end = t_end_i j += 1 # write final segment out_timestamps[j, 0] = t_start out_timestamps[j, 1] = t_end out_timestamps = out_timestamps[: j + 1] return out_timestamps
[docs]def bin_vad_to_timestamps( vad, frame_length, frame_shift, snip_edges=False, merge_tol=0.001 ): """Converts binary VAD to a list of start end time stamps Args: vad: Binary VAD frame_length: frame-length used to compute the VAD frame_shift: frame-shift used to compute the VAD snip_edges: if True, computing VAD used snip-edges option merge_tol: tolerance to merge contiguous segments Returns: VAD time stamps refered to the begining of the file """ if snip_edges: start = 0 else: start = -(frame_length - frame_shift) / 2 start_timestamps = np.asarray( [start + frame_shift * i for i in range(len(vad)) if vad[i]] )[:, None] end_timestamps = start_timestamps + frame_length start_timestamps[start_timestamps < 0] = 0 timestamps = np.concatenate((start_timestamps, end_timestamps), axis=1) return merge_vad_timestamps(timestamps, tol=merge_tol)
[docs]def vad_timestamps_to_bin( in_timestamps, frame_length, frame_shift, snip_edges=False, signal_length=None, max_frames=None, ): """Converts VAD time-stamps to a binary vector Args: in_timestamps: vad timestamps frame_length: frame-length used to compute the VAD frame_shift: frame-shift used to compute the VAD snip_edges: if True, computing VAD used snip-edges option signal_length: total duration of the signal, if None it takes it from the last timestamp max_frames: expected number of frames, if None it computes automatically Returns: Binary VAD np.array """ _assert_pos_dur(in_timestamps) if signal_length is None: signal_length = in_timestamps[-1, 1] else: assert signal_length >= in_timestamps[-1, 1] frame_center = frame_length / 2 if snip_edges: num_frames = int( np.floor((signal_length - frame_length + frame_shift) / frame_shift) ) pad = 0 else: num_frames = int(np.round(signal_length / frame_shift)) pad = -(frame_length - frame_shift) / 2 if max_frames is not None and num_frames < max_frames: num_frames = max_frames vad = np.zeros((num_frames,), dtype=np.bool) frame_start = np.ceil( (in_timestamps[:, 0] - (pad + frame_center)) / frame_shift ).astype(dtype=np.int) frame_end = ( np.floor((in_timestamps[:, 1] - (pad + frame_center)) / frame_shift).astype( dtype=np.int ) + 1 ) frame_start[frame_start < 0] = 0 frame_end[frame_end > num_frames] = num_frames for i, j in zip(frame_start, frame_end): if j > i: vad[i:j] = True if max_frames is not None and num_frames > max_frames: vad = vad[:max_frames] return vad
[docs]def timestamps_wrt_vad_to_absolute_timestamps(in_timestamps, vad_timestamps): """Converts time stamps relative to a signal with silence removed to absoulute time stamps in the original signal VAD is provided in start-end timestamps format also. Args: in_timestamps: time stamps relative to a signal with silence removed vad_timestamps: vad timestamps used to remove silence from signal Returns: Absolute VAD time-stamps """ bin_in = vad_timestamps_to_bin(in_timestamps, frame_length=0.001, frame_shift=0.001) bin_vad = vad_timestamps_to_bin( vad_timestamps, frame_length=0.001, frame_shift=0.001 ) bin_out = np.zeros_like(bin_vad) j = 0 max_j = len(bin_in) for i in range(len(bin_out)): if bin_vad[i]: bin_out[i] = bin_in[j] j += 1 if j == max_j: break out_timestamps = bin_vad_to_timestamps( bin_out, frame_length=0.001, frame_shift=0.001, merge_tol=0.001 ) return out_timestamps
[docs]def timestamps_wrt_bin_vad_to_absolute_timestamps( in_timestamps, vad, frame_length, frame_shift, snip_edges=False ): """Converts time stamps relative to a signal with silence removed to absoulute time stamps in the original signal VAD is provided in binary format Args: in_timestamps: time stamps relative to a signal with silence removed vad: Binary VAD frame_length: frame-length used to compute the VAD frame_shift: frame-shift used to compute the VAD snip_edges: if True, computing VAD used snip-edges option Returns: Absolute VAD time-stamps """ vad_timestamps = bin_vad_to_timestamps(vad, frame_length, frame_shift, snip_edges) return timestamps_wrt_vad_to_absolute_timestamps(in_timestamps, vad_timestamps)
[docs]def intersect_segment_timestamps_with_vad(in_timestamps, vad_timestamps): """Intersects a list of segment timestamps with a VAD time-stamps It returns only the segments that contain speech modifying the start and end times to remove silence from the segments. Args: in_timestamps: time stamps of a list of segments refered to time 0. vad_timestamps: vad timestamps Returns: Boolean array indicating which input segments contain speech Array of output segments with silence removed Array of indices, one index for each output segment indicating to which input speech segment correspond to. The index correspond to input segments after removing input segments that only contain silence. """ # if empty return the same if in_timestamps.shape[0] == 0: return in_timestamps # assert segments are shorted by start time, and positive dur _assert_sorted(in_timestamps) _assert_pos_dur(in_timestamps) _assert_sorted(vad_timestamps) _assert_pos_dur(vad_timestamps) vad_start = vad_timestamps[:, 0] vad_end = vad_timestamps[:, 1] num_vad_segs = len(vad_start) speech_idx = np.zeros((in_timestamps.shape[0],), dtype=np.bool) out_timestamps = [] out_timestamps2speech_segs = [] count_speech = 0 j = 0 for i, stamps in enumerate(in_timestamps): t_start, t_end = stamps is_speech = False while j < num_vad_segs and vad_end[j] <= t_start: j += 1 if j == num_vad_segs: break k = j while t_start < t_end: if k == num_vad_segs or vad_start[k] >= t_end or vad_end[k] <= t_start: break # print('...', vad_start[k], vad_end[k], t_start, t_end) is_speech = True if vad_start[k] <= t_start: if vad_end[k] < t_end: new_seg = [t_start, vad_end[k]] t_start = vad_end[k] else: new_seg = [t_start, t_end] t_start = t_end else: if vad_end[k] < t_end: new_seg = [vad_start[k], vad_end[k]] t_start = vad_end[k] else: new_seg = [vad_start[k], t_end] t_start = t_end out_timestamps.append(new_seg) # print('......', out_timestamps) out_timestamps2speech_segs.append(count_speech) k += 1 speech_idx[i] = is_speech if is_speech: count_speech += 1 out_timestamps = np.asarray(out_timestamps) out_timestamps2speech_segs = np.asarray(out_timestamps2speech_segs, dtype=np.int) return speech_idx, out_timestamps, out_timestamps2speech_segs