File size: 11,844 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# Copyright The Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from functools import lru_cache
from typing import Any, Optional

import numpy as np
import torch
from torch import Tensor

from torchmetrics.utilities import rank_zero_info, rank_zero_warn
from torchmetrics.utilities.imports import _LIBROSA_AVAILABLE, _ONNXRUNTIME_AVAILABLE, _REQUESTS_AVAILABLE

if _LIBROSA_AVAILABLE and _ONNXRUNTIME_AVAILABLE and _REQUESTS_AVAILABLE:
    import librosa
    import onnxruntime as ort
    import requests
    from onnxruntime import InferenceSession
else:
    librosa, ort, requests = None, None, None  # type:ignore

    class InferenceSession:  # type:ignore
        """Dummy InferenceSession."""

        def __init__(self, **kwargs: dict[str, Any]) -> None: ...


__doctest_requires__ = {
    ("deep_noise_suppression_mean_opinion_score", "_load_session"): ["requests", "librosa", "onnxruntime"]
}

SAMPLING_RATE = 16000
INPUT_LENGTH = 9.01
DNSMOS_DIR = "~/.torchmetrics/DNSMOS"


def _prepare_dnsmos(dnsmos_dir: str) -> None:
    """Download required DNSMOS files.

    Args:
        dnsmos_dir: a dir to save the downloaded files. Defaults to "~/.torchmetrics".

    """
    # https://raw.githubusercontent.com/microsoft/DNS-Challenge/master/DNSMOS/DNSMOS/model_v8.onnx
    # https://raw.githubusercontent.com/microsoft/DNS-Challenge/master/DNSMOS/DNSMOS/sig_bak_ovr.onnx
    # https://raw.githubusercontent.com/microsoft/DNS-Challenge/master/DNSMOS/pDNSMOS/sig_bak_ovr.onnx
    url = "https://raw.githubusercontent.com/microsoft/DNS-Challenge/master"
    dnsmos_dir = os.path.expanduser(dnsmos_dir)

    # save to or load from ~/torchmetrics/dnsmos/.
    for file in ["DNSMOS/DNSMOS/model_v8.onnx", "DNSMOS/DNSMOS/sig_bak_ovr.onnx", "DNSMOS/pDNSMOS/sig_bak_ovr.onnx"]:
        saveto = os.path.join(dnsmos_dir, file[7:])
        os.makedirs(os.path.dirname(saveto), exist_ok=True)
        if os.path.exists(saveto):
            # try loading onnx
            try:
                _ = InferenceSession(saveto)
                continue  # skip downloading if succeeded
            except Exception as _:
                os.remove(saveto)
        urlf = f"{url}/{file}"
        rank_zero_info(f"downloading {urlf} to {saveto}")
        myfile = requests.get(urlf)
        with open(saveto, "wb") as f:
            f.write(myfile.content)


def _load_session(
    path: str,
    device: torch.device,
    num_threads: Optional[int] = None,
) -> InferenceSession:
    """Load onnxruntime session.

    Args:
        path: the model path
        device: the device used
        num_threads: the number of threads to use. Defaults to None.

    Returns:
        onnxruntime session

    """
    path = os.path.expanduser(path)
    if not os.path.exists(path):
        _prepare_dnsmos(DNSMOS_DIR)

    opts = ort.SessionOptions()
    if num_threads is not None:
        opts.inter_op_num_threads = num_threads
        opts.intra_op_num_threads = num_threads

    if device.type == "cpu":
        infs = InferenceSession(path, providers=["CPUExecutionProvider"], sess_options=opts)
    elif "CUDAExecutionProvider" in ort.get_available_providers():  # win or linux with cuda
        providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
        provider_options = [{"device_id": device.index}, {}]
        infs = InferenceSession(path, providers=providers, provider_options=provider_options, sess_options=opts)
    elif "CoreMLExecutionProvider" in ort.get_available_providers():  # macos with coreml
        providers = ["CoreMLExecutionProvider", "CPUExecutionProvider"]
        provider_options = [{"device_id": device.index}, {}]
        infs = InferenceSession(path, providers=providers, provider_options=provider_options, sess_options=opts)
    else:
        infs = InferenceSession(path, providers=["CPUExecutionProvider"], sess_options=opts)

    return infs


_cached_load_session = lru_cache()(_load_session)


def _audio_melspec(
    audio: np.ndarray,
    n_mels: int = 120,
    frame_size: int = 320,
    hop_length: int = 160,
    sr: int = 16000,
    to_db: bool = True,
) -> np.ndarray:
    """Calculate the mel-spectrogram of an audio.

    Args:
        audio: [..., T]
        n_mels: the number of mel-frequencies
        frame_size: stft length
        hop_length: stft hop length
        sr: sample rate of audio
        to_db: convert to dB scale if `True` is given

    Returns:
        mel-spectrogram: [..., num_mel, T']

    """
    shape = audio.shape
    audio = audio.reshape(-1, shape[-1])
    mel_spec = librosa.feature.melspectrogram(
        y=audio, sr=sr, n_fft=frame_size + 1, hop_length=hop_length, n_mels=n_mels
    )
    mel_spec = mel_spec.transpose(0, 2, 1)
    mel_spec = mel_spec.reshape(shape[:-1] + mel_spec.shape[1:])
    if to_db:
        for b in range(mel_spec.shape[0]):
            mel_spec[b, ...] = (librosa.power_to_db(mel_spec[b], ref=np.max) + 40) / 40
    return mel_spec


def _polyfit_val(mos: np.ndarray, personalized: bool) -> np.ndarray:
    """Use polyfit to convert raw mos values to DNSMOS values.

    Args:
        mos: the raw mos values, [..., 4]
        personalized: whether interfering speaker is penalized

    Returns:
        DNSMOS: [..., 4]

    """
    if personalized:
        p_ovr = np.poly1d([-0.00533021, 0.005101, 1.18058466, -0.11236046])
        p_sig = np.poly1d([-0.01019296, 0.02751166, 1.19576786, -0.24348726])
        p_bak = np.poly1d([-0.04976499, 0.44276479, -0.1644611, 0.96883132])
    else:
        p_ovr = np.poly1d([-0.06766283, 1.11546468, 0.04602535])
        p_sig = np.poly1d([-0.08397278, 1.22083953, 0.0052439])  # x**2*v0 + x**1*v1+ v2
        p_bak = np.poly1d([-0.13166888, 1.60915514, -0.39604546])

    mos[..., 1] = p_sig(mos[..., 1])
    mos[..., 2] = p_bak(mos[..., 2])
    mos[..., 3] = p_ovr(mos[..., 3])
    return mos


def deep_noise_suppression_mean_opinion_score(
    preds: Tensor,
    fs: int,
    personalized: bool,
    device: Optional[str] = None,
    num_threads: Optional[int] = None,
    cache_session: bool = True,
) -> Tensor:
    """Calculate `Deep Noise Suppression performance evaluation based on Mean Opinion Score`_ (DNSMOS).

    Human subjective evaluation is the ”gold standard” to evaluate speech quality optimized for human perception.
    Perceptual objective metrics serve as a proxy for subjective scores. The conventional and widely used metrics
    require a reference clean speech signal, which is unavailable in real recordings. The no-reference approaches
    correlate poorly with human ratings and are not widely adopted in the research community. One of the biggest
    use cases of these perceptual objective metrics is to evaluate noise suppression algorithms. DNSMOS generalizes
    well in challenging test conditions with a high correlation to human ratings in stack ranking noise suppression
    methods. More details can be found in `DNSMOS paper <https://arxiv.org/abs/2010.15258>`_ and
    `DNSMOS P.835 paper <https://arxiv.org/abs/2110.01763>`_.


    .. hint::
        Using this metric requires you to have ``librosa``, ``onnxruntime`` and ``requests`` installed. Install
        as ``pip install torchmetrics['audio']`` or alternatively ``pip install librosa onnxruntime-gpu requests``
        (if you do not have GPU enabled machine install ``onnxruntime`` instead of ``onnxruntime-gpu``)

    Args:
        preds: [..., time]
        fs: sampling frequency
        personalized: whether interfering speaker is penalized
        device: the device used for calculating DNSMOS, can be cpu or cuda:n, where n is the index of gpu.
            If None is given, then the device of input is used.
        num_threads: the number of threads to use for cpu inference. Defaults to None.
        cache_session: whether to cache the onnx session. By default this is true, meaning that repeated calls to this
            method is faster than if this was set to False, the consequence is that the session will be cached in
            memory until the process is terminated.

    Returns:
        Float tensor with shape ``(...,4)`` of DNSMOS values per sample, i.e. [p808_mos, mos_sig, mos_bak, mos_ovr]

    Raises:
        ModuleNotFoundError:
            If ``librosa``, ``onnxruntime`` or ``requests`` packages are not installed

    Example:
        >>> from torch import randn
        >>> from torchmetrics.functional.audio.dnsmos import deep_noise_suppression_mean_opinion_score
        >>> preds = randn(8000)
        >>> deep_noise_suppression_mean_opinion_score(preds, 8000, False)
        tensor([2.2..., 2.0..., 1.1..., 1.2...], dtype=torch.float64)

    """
    if not _LIBROSA_AVAILABLE or not _ONNXRUNTIME_AVAILABLE or not _REQUESTS_AVAILABLE:
        raise ModuleNotFoundError(
            "DNSMOS metric requires that librosa, onnxruntime and requests are installed."
            " Install as `pip install librosa onnxruntime-gpu requests`."
        )
    device = torch.device(device) if device is not None else preds.device

    _load_session_function = _cached_load_session if cache_session else _load_session
    onnx_sess = _load_session_function(
        f"{DNSMOS_DIR}/{'p' if personalized else ''}DNSMOS/sig_bak_ovr.onnx", device, num_threads
    )
    p808_onnx_sess = _load_session_function(f"{DNSMOS_DIR}/DNSMOS/model_v8.onnx", device, num_threads)

    desired_fs = SAMPLING_RATE
    if fs != desired_fs:
        audio = librosa.resample(preds.cpu().numpy(), orig_sr=fs, target_sr=desired_fs)
    else:
        audio = preds.cpu().numpy()

    len_samples = int(INPUT_LENGTH * desired_fs)
    while audio.shape[-1] < len_samples:
        audio = np.concatenate([audio, audio], axis=-1)

    num_hops = int(np.floor(audio.shape[-1] / desired_fs) - INPUT_LENGTH) + 1

    moss = []
    hop_len_samples = desired_fs
    for idx in range(num_hops):
        audio_seg = audio[..., int(idx * hop_len_samples) : int((idx + INPUT_LENGTH) * hop_len_samples)]
        if audio_seg.shape[-1] < len_samples:
            continue
        shape = audio_seg.shape
        audio_seg = audio_seg.reshape((-1, shape[-1]))

        input_features = np.array(audio_seg).astype("float32")
        p808_input_features = np.array(_audio_melspec(audio=audio_seg[..., :-160])).astype("float32")

        if device.type != "cpu" and (
            "CUDAExecutionProvider" in ort.get_available_providers()
            or "CoreMLExecutionProvider" in ort.get_available_providers()
        ):
            try:
                input_features = ort.OrtValue.ortvalue_from_numpy(input_features, device.type, device.index)
                p808_input_features = ort.OrtValue.ortvalue_from_numpy(p808_input_features, device.type, device.index)
            except Exception as e:
                rank_zero_warn(f"Failed to use GPU for DNSMOS, reverting to CPU. Error: {e}")

        oi = {"input_1": input_features}
        p808_oi = {"input_1": p808_input_features}
        mos_np = np.concatenate(
            [p808_onnx_sess.run(None, p808_oi)[0], onnx_sess.run(None, oi)[0]], axis=-1, dtype="float64"
        )
        mos_np = _polyfit_val(mos_np, personalized)

        mos_np = mos_np.reshape(shape[:-1] + (4,))
        moss.append(mos_np)
    return torch.from_numpy(np.mean(np.stack(moss, axis=-1), axis=-1))  # [p808_mos, mos_sig, mos_bak, mos_ovr]