|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
from functools import lru_cache |
|
from typing import Any, Optional |
|
|
|
import numpy as np |
|
import torch |
|
from torch import Tensor |
|
|
|
from torchmetrics.utilities import rank_zero_info, rank_zero_warn |
|
from torchmetrics.utilities.imports import _LIBROSA_AVAILABLE, _ONNXRUNTIME_AVAILABLE, _REQUESTS_AVAILABLE |
|
|
|
if _LIBROSA_AVAILABLE and _ONNXRUNTIME_AVAILABLE and _REQUESTS_AVAILABLE: |
|
import librosa |
|
import onnxruntime as ort |
|
import requests |
|
from onnxruntime import InferenceSession |
|
else: |
|
librosa, ort, requests = None, None, None |
|
|
|
class InferenceSession: |
|
"""Dummy InferenceSession.""" |
|
|
|
def __init__(self, **kwargs: dict[str, Any]) -> None: ... |
|
|
|
|
|
__doctest_requires__ = { |
|
("deep_noise_suppression_mean_opinion_score", "_load_session"): ["requests", "librosa", "onnxruntime"] |
|
} |
|
|
|
SAMPLING_RATE = 16000 |
|
INPUT_LENGTH = 9.01 |
|
DNSMOS_DIR = "~/.torchmetrics/DNSMOS" |
|
|
|
|
|
def _prepare_dnsmos(dnsmos_dir: str) -> None: |
|
"""Download required DNSMOS files. |
|
|
|
Args: |
|
dnsmos_dir: a dir to save the downloaded files. Defaults to "~/.torchmetrics". |
|
|
|
""" |
|
|
|
|
|
|
|
url = "https://raw.githubusercontent.com/microsoft/DNS-Challenge/master" |
|
dnsmos_dir = os.path.expanduser(dnsmos_dir) |
|
|
|
|
|
for file in ["DNSMOS/DNSMOS/model_v8.onnx", "DNSMOS/DNSMOS/sig_bak_ovr.onnx", "DNSMOS/pDNSMOS/sig_bak_ovr.onnx"]: |
|
saveto = os.path.join(dnsmos_dir, file[7:]) |
|
os.makedirs(os.path.dirname(saveto), exist_ok=True) |
|
if os.path.exists(saveto): |
|
|
|
try: |
|
_ = InferenceSession(saveto) |
|
continue |
|
except Exception as _: |
|
os.remove(saveto) |
|
urlf = f"{url}/{file}" |
|
rank_zero_info(f"downloading {urlf} to {saveto}") |
|
myfile = requests.get(urlf) |
|
with open(saveto, "wb") as f: |
|
f.write(myfile.content) |
|
|
|
|
|
def _load_session( |
|
path: str, |
|
device: torch.device, |
|
num_threads: Optional[int] = None, |
|
) -> InferenceSession: |
|
"""Load onnxruntime session. |
|
|
|
Args: |
|
path: the model path |
|
device: the device used |
|
num_threads: the number of threads to use. Defaults to None. |
|
|
|
Returns: |
|
onnxruntime session |
|
|
|
""" |
|
path = os.path.expanduser(path) |
|
if not os.path.exists(path): |
|
_prepare_dnsmos(DNSMOS_DIR) |
|
|
|
opts = ort.SessionOptions() |
|
if num_threads is not None: |
|
opts.inter_op_num_threads = num_threads |
|
opts.intra_op_num_threads = num_threads |
|
|
|
if device.type == "cpu": |
|
infs = InferenceSession(path, providers=["CPUExecutionProvider"], sess_options=opts) |
|
elif "CUDAExecutionProvider" in ort.get_available_providers(): |
|
providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] |
|
provider_options = [{"device_id": device.index}, {}] |
|
infs = InferenceSession(path, providers=providers, provider_options=provider_options, sess_options=opts) |
|
elif "CoreMLExecutionProvider" in ort.get_available_providers(): |
|
providers = ["CoreMLExecutionProvider", "CPUExecutionProvider"] |
|
provider_options = [{"device_id": device.index}, {}] |
|
infs = InferenceSession(path, providers=providers, provider_options=provider_options, sess_options=opts) |
|
else: |
|
infs = InferenceSession(path, providers=["CPUExecutionProvider"], sess_options=opts) |
|
|
|
return infs |
|
|
|
|
|
_cached_load_session = lru_cache()(_load_session) |
|
|
|
|
|
def _audio_melspec( |
|
audio: np.ndarray, |
|
n_mels: int = 120, |
|
frame_size: int = 320, |
|
hop_length: int = 160, |
|
sr: int = 16000, |
|
to_db: bool = True, |
|
) -> np.ndarray: |
|
"""Calculate the mel-spectrogram of an audio. |
|
|
|
Args: |
|
audio: [..., T] |
|
n_mels: the number of mel-frequencies |
|
frame_size: stft length |
|
hop_length: stft hop length |
|
sr: sample rate of audio |
|
to_db: convert to dB scale if `True` is given |
|
|
|
Returns: |
|
mel-spectrogram: [..., num_mel, T'] |
|
|
|
""" |
|
shape = audio.shape |
|
audio = audio.reshape(-1, shape[-1]) |
|
mel_spec = librosa.feature.melspectrogram( |
|
y=audio, sr=sr, n_fft=frame_size + 1, hop_length=hop_length, n_mels=n_mels |
|
) |
|
mel_spec = mel_spec.transpose(0, 2, 1) |
|
mel_spec = mel_spec.reshape(shape[:-1] + mel_spec.shape[1:]) |
|
if to_db: |
|
for b in range(mel_spec.shape[0]): |
|
mel_spec[b, ...] = (librosa.power_to_db(mel_spec[b], ref=np.max) + 40) / 40 |
|
return mel_spec |
|
|
|
|
|
def _polyfit_val(mos: np.ndarray, personalized: bool) -> np.ndarray: |
|
"""Use polyfit to convert raw mos values to DNSMOS values. |
|
|
|
Args: |
|
mos: the raw mos values, [..., 4] |
|
personalized: whether interfering speaker is penalized |
|
|
|
Returns: |
|
DNSMOS: [..., 4] |
|
|
|
""" |
|
if personalized: |
|
p_ovr = np.poly1d([-0.00533021, 0.005101, 1.18058466, -0.11236046]) |
|
p_sig = np.poly1d([-0.01019296, 0.02751166, 1.19576786, -0.24348726]) |
|
p_bak = np.poly1d([-0.04976499, 0.44276479, -0.1644611, 0.96883132]) |
|
else: |
|
p_ovr = np.poly1d([-0.06766283, 1.11546468, 0.04602535]) |
|
p_sig = np.poly1d([-0.08397278, 1.22083953, 0.0052439]) |
|
p_bak = np.poly1d([-0.13166888, 1.60915514, -0.39604546]) |
|
|
|
mos[..., 1] = p_sig(mos[..., 1]) |
|
mos[..., 2] = p_bak(mos[..., 2]) |
|
mos[..., 3] = p_ovr(mos[..., 3]) |
|
return mos |
|
|
|
|
|
def deep_noise_suppression_mean_opinion_score( |
|
preds: Tensor, |
|
fs: int, |
|
personalized: bool, |
|
device: Optional[str] = None, |
|
num_threads: Optional[int] = None, |
|
cache_session: bool = True, |
|
) -> Tensor: |
|
"""Calculate `Deep Noise Suppression performance evaluation based on Mean Opinion Score`_ (DNSMOS). |
|
|
|
Human subjective evaluation is the βgold standardβ to evaluate speech quality optimized for human perception. |
|
Perceptual objective metrics serve as a proxy for subjective scores. The conventional and widely used metrics |
|
require a reference clean speech signal, which is unavailable in real recordings. The no-reference approaches |
|
correlate poorly with human ratings and are not widely adopted in the research community. One of the biggest |
|
use cases of these perceptual objective metrics is to evaluate noise suppression algorithms. DNSMOS generalizes |
|
well in challenging test conditions with a high correlation to human ratings in stack ranking noise suppression |
|
methods. More details can be found in `DNSMOS paper <https://arxiv.org/abs/2010.15258>`_ and |
|
`DNSMOS P.835 paper <https://arxiv.org/abs/2110.01763>`_. |
|
|
|
|
|
.. hint:: |
|
Using this metric requires you to have ``librosa``, ``onnxruntime`` and ``requests`` installed. Install |
|
as ``pip install torchmetrics['audio']`` or alternatively ``pip install librosa onnxruntime-gpu requests`` |
|
(if you do not have GPU enabled machine install ``onnxruntime`` instead of ``onnxruntime-gpu``) |
|
|
|
Args: |
|
preds: [..., time] |
|
fs: sampling frequency |
|
personalized: whether interfering speaker is penalized |
|
device: the device used for calculating DNSMOS, can be cpu or cuda:n, where n is the index of gpu. |
|
If None is given, then the device of input is used. |
|
num_threads: the number of threads to use for cpu inference. Defaults to None. |
|
cache_session: whether to cache the onnx session. By default this is true, meaning that repeated calls to this |
|
method is faster than if this was set to False, the consequence is that the session will be cached in |
|
memory until the process is terminated. |
|
|
|
Returns: |
|
Float tensor with shape ``(...,4)`` of DNSMOS values per sample, i.e. [p808_mos, mos_sig, mos_bak, mos_ovr] |
|
|
|
Raises: |
|
ModuleNotFoundError: |
|
If ``librosa``, ``onnxruntime`` or ``requests`` packages are not installed |
|
|
|
Example: |
|
>>> from torch import randn |
|
>>> from torchmetrics.functional.audio.dnsmos import deep_noise_suppression_mean_opinion_score |
|
>>> preds = randn(8000) |
|
>>> deep_noise_suppression_mean_opinion_score(preds, 8000, False) |
|
tensor([2.2..., 2.0..., 1.1..., 1.2...], dtype=torch.float64) |
|
|
|
""" |
|
if not _LIBROSA_AVAILABLE or not _ONNXRUNTIME_AVAILABLE or not _REQUESTS_AVAILABLE: |
|
raise ModuleNotFoundError( |
|
"DNSMOS metric requires that librosa, onnxruntime and requests are installed." |
|
" Install as `pip install librosa onnxruntime-gpu requests`." |
|
) |
|
device = torch.device(device) if device is not None else preds.device |
|
|
|
_load_session_function = _cached_load_session if cache_session else _load_session |
|
onnx_sess = _load_session_function( |
|
f"{DNSMOS_DIR}/{'p' if personalized else ''}DNSMOS/sig_bak_ovr.onnx", device, num_threads |
|
) |
|
p808_onnx_sess = _load_session_function(f"{DNSMOS_DIR}/DNSMOS/model_v8.onnx", device, num_threads) |
|
|
|
desired_fs = SAMPLING_RATE |
|
if fs != desired_fs: |
|
audio = librosa.resample(preds.cpu().numpy(), orig_sr=fs, target_sr=desired_fs) |
|
else: |
|
audio = preds.cpu().numpy() |
|
|
|
len_samples = int(INPUT_LENGTH * desired_fs) |
|
while audio.shape[-1] < len_samples: |
|
audio = np.concatenate([audio, audio], axis=-1) |
|
|
|
num_hops = int(np.floor(audio.shape[-1] / desired_fs) - INPUT_LENGTH) + 1 |
|
|
|
moss = [] |
|
hop_len_samples = desired_fs |
|
for idx in range(num_hops): |
|
audio_seg = audio[..., int(idx * hop_len_samples) : int((idx + INPUT_LENGTH) * hop_len_samples)] |
|
if audio_seg.shape[-1] < len_samples: |
|
continue |
|
shape = audio_seg.shape |
|
audio_seg = audio_seg.reshape((-1, shape[-1])) |
|
|
|
input_features = np.array(audio_seg).astype("float32") |
|
p808_input_features = np.array(_audio_melspec(audio=audio_seg[..., :-160])).astype("float32") |
|
|
|
if device.type != "cpu" and ( |
|
"CUDAExecutionProvider" in ort.get_available_providers() |
|
or "CoreMLExecutionProvider" in ort.get_available_providers() |
|
): |
|
try: |
|
input_features = ort.OrtValue.ortvalue_from_numpy(input_features, device.type, device.index) |
|
p808_input_features = ort.OrtValue.ortvalue_from_numpy(p808_input_features, device.type, device.index) |
|
except Exception as e: |
|
rank_zero_warn(f"Failed to use GPU for DNSMOS, reverting to CPU. Error: {e}") |
|
|
|
oi = {"input_1": input_features} |
|
p808_oi = {"input_1": p808_input_features} |
|
mos_np = np.concatenate( |
|
[p808_onnx_sess.run(None, p808_oi)[0], onnx_sess.run(None, oi)[0]], axis=-1, dtype="float64" |
|
) |
|
mos_np = _polyfit_val(mos_np, personalized) |
|
|
|
mos_np = mos_np.reshape(shape[:-1] + (4,)) |
|
moss.append(mos_np) |
|
return torch.from_numpy(np.mean(np.stack(moss, axis=-1), axis=-1)) |
|
|