diff --git a/models.py b/models.py index bf5db610..b75861fe 100644 --- a/models.py +++ b/models.py @@ -1,38 +1,41 @@ - - import subprocess - - - -command1 = [ - "aria2c", - "--console-log-level=error", - "-c", - "-x", "16", - "-s", "16", - "-k", "1M", - "https://huggingface.co/lj1995/VoiceConversionWebUI/resolve/main/hubert_base.pt", - "-d", "/content/Advanced-RVC-Inference", - "-o", "hubert_base.pt" -] - - -command2 = [ - "aria2c", - "--console-log-level=error", - "-c", - "-x", "16", - "-s", "16", - "-k", "1M", - "https://huggingface.co/lj1995/VoiceConversionWebUI/resolve/main/rmvpe.pt", - "-d", "/content/Advanced-RVC-Inference", - "-o", "rmvpe.pt" -] - - -subprocess.run(command1) - - -subprocess.run(command2) - -print("done") +import os + +def download_file(url, output_name, destination): + command = [ + "aria2c", + "--console-log-level=error", + "-c", + "-x", "16", + "-s", "16", + "-k", "1M", + url, + "-d", destination, + "-o", output_name + ] + subprocess.run(command) + +if __name__ == "__main__": + current_directory = os.getcwd() + + # List of files to download + files_to_download = [ + { + "url": "https://huggingface.co/theNeofr/rvc-base/resolve/main/hubert_base.pt", + "output_name": "hubert_base.pt" + }, + { + "url": "https://huggingface.co/theNeofr/rvc-base/resolve/main/rmvpe.pt", + "output_name": "rmvpe.pt" + }, + { + "url": "https://huggingface.co/theNeofr/rvc-base/resolve/main/fcpe.pt", + "output_name": "fcpe.pt" + } + ] + + # Download each file + for file in files_to_download: + download_file(file["url"], file["output_name"], current_directory) + + print("Download completed.") diff --git a/predictor/fcpe.py b/predictor/fcpe.py new file mode 100644 index 00000000..08541bb8 --- /dev/null +++ b/predictor/fcpe.py @@ -0,0 +1,1036 @@ +from typing import Union + +import torch.nn.functional as F +import numpy as np +import torch +import torch.nn as nn +from torch.nn.utils.parametrizations import weight_norm +from torchaudio.transforms import Resample +import os +import librosa +import soundfile as sf +import torch.utils.data +from librosa.filters import mel as librosa_mel_fn +import math +from functools import partial + +from einops import rearrange, repeat +from local_attention import LocalAttention +from torch import nn + +os.environ["LRU_CACHE_CAPACITY"] = "3" + + +def load_wav_to_torch(full_path, target_sr=None, return_empty_on_exception=False): + sampling_rate = None + try: + data, sampling_rate = sf.read(full_path, always_2d=True) # than soundfile. + except Exception as error: + print(f"'{full_path}' failed to load with {error}") + if return_empty_on_exception: + return [], sampling_rate or target_sr or 48000 + else: + raise Exception(error) + + if len(data.shape) > 1: + data = data[:, 0] + assert ( + len(data) > 2 + ) # check duration of audio file is > 2 samples (because otherwise the slice operation was on the wrong dimension) + + if np.issubdtype(data.dtype, np.integer): # if audio data is type int + max_mag = -np.iinfo( + data.dtype + ).min # maximum magnitude = min possible value of intXX + else: # if audio data is type fp32 + max_mag = max(np.amax(data), -np.amin(data)) + max_mag = ( + (2**31) + 1 + if max_mag > (2**15) + else ((2**15) + 1 if max_mag > 1.01 else 1.0) + ) # data should be either 16-bit INT, 32-bit INT or [-1 to 1] float32 + + data = torch.FloatTensor(data.astype(np.float32)) / max_mag + + if ( + torch.isinf(data) | torch.isnan(data) + ).any() and return_empty_on_exception: # resample will crash with inf/NaN inputs. return_empty_on_exception will return empty arr instead of except + return [], sampling_rate or target_sr or 48000 + if target_sr is not None and sampling_rate != target_sr: + data = torch.from_numpy( + librosa.core.resample( + data.numpy(), orig_sr=sampling_rate, target_sr=target_sr + ) + ) + sampling_rate = target_sr + + return data, sampling_rate + + +def dynamic_range_compression(x, C=1, clip_val=1e-5): + return np.log(np.clip(x, a_min=clip_val, a_max=None) * C) + + +def dynamic_range_decompression(x, C=1): + return np.exp(x) / C + + +def dynamic_range_compression_torch(x, C=1, clip_val=1e-5): + return torch.log(torch.clamp(x, min=clip_val) * C) + + +def dynamic_range_decompression_torch(x, C=1): + return torch.exp(x) / C + + +class STFT: + def __init__( + self, + sr=22050, + n_mels=80, + n_fft=1024, + win_size=1024, + hop_length=256, + fmin=20, + fmax=11025, + clip_val=1e-5, + ): + self.target_sr = sr + + self.n_mels = n_mels + self.n_fft = n_fft + self.win_size = win_size + self.hop_length = hop_length + self.fmin = fmin + self.fmax = fmax + self.clip_val = clip_val + self.mel_basis = {} + self.hann_window = {} + + def get_mel(self, y, keyshift=0, speed=1, center=False, train=False): + sampling_rate = self.target_sr + n_mels = self.n_mels + n_fft = self.n_fft + win_size = self.win_size + hop_length = self.hop_length + fmin = self.fmin + fmax = self.fmax + clip_val = self.clip_val + + factor = 2 ** (keyshift / 12) + n_fft_new = int(np.round(n_fft * factor)) + win_size_new = int(np.round(win_size * factor)) + hop_length_new = int(np.round(hop_length * speed)) + if not train: + mel_basis = self.mel_basis + hann_window = self.hann_window + else: + mel_basis = {} + hann_window = {} + + mel_basis_key = str(fmax) + "_" + str(y.device) + if mel_basis_key not in mel_basis: + mel = librosa_mel_fn( + sr=sampling_rate, n_fft=n_fft, n_mels=n_mels, fmin=fmin, fmax=fmax + ) + mel_basis[mel_basis_key] = torch.from_numpy(mel).float().to(y.device) + + keyshift_key = str(keyshift) + "_" + str(y.device) + if keyshift_key not in hann_window: + hann_window[keyshift_key] = torch.hann_window(win_size_new).to(y.device) + + pad_left = (win_size_new - hop_length_new) // 2 + pad_right = max( + (win_size_new - hop_length_new + 1) // 2, + win_size_new - y.size(-1) - pad_left, + ) + if pad_right < y.size(-1): + mode = "reflect" + else: + mode = "constant" + y = torch.nn.functional.pad(y.unsqueeze(1), (pad_left, pad_right), mode=mode) + y = y.squeeze(1) + + spec = torch.stft( + y, + n_fft_new, + hop_length=hop_length_new, + win_length=win_size_new, + window=hann_window[keyshift_key], + center=center, + pad_mode="reflect", + normalized=False, + onesided=True, + return_complex=True, + ) + spec = torch.sqrt(spec.real.pow(2) + spec.imag.pow(2) + (1e-9)) + if keyshift != 0: + size = n_fft // 2 + 1 + resize = spec.size(1) + if resize < size: + spec = F.pad(spec, (0, 0, 0, size - resize)) + spec = spec[:, :size, :] * win_size / win_size_new + spec = torch.matmul(mel_basis[mel_basis_key], spec) + spec = dynamic_range_compression_torch(spec, clip_val=clip_val) + return spec + + def __call__(self, audiopath): + audio, sr = load_wav_to_torch(audiopath, target_sr=self.target_sr) + spect = self.get_mel(audio.unsqueeze(0)).squeeze(0) + return spect + + +stft = STFT() + +# import fast_transformers.causal_product.causal_product_cuda + + +def softmax_kernel( + data, *, projection_matrix, is_query, normalize_data=True, eps=1e-4, device=None +): + b, h, *_ = data.shape + # (batch size, head, length, model_dim) + + # normalize model dim + data_normalizer = (data.shape[-1] ** -0.25) if normalize_data else 1.0 + + # what is ration?, projection_matrix.shape[0] --> 266 + + ratio = projection_matrix.shape[0] ** -0.5 + + projection = repeat(projection_matrix, "j d -> b h j d", b=b, h=h) + projection = projection.type_as(data) + + # data_dash = w^T x + data_dash = torch.einsum("...id,...jd->...ij", (data_normalizer * data), projection) + + # diag_data = D**2 + diag_data = data**2 + diag_data = torch.sum(diag_data, dim=-1) + diag_data = (diag_data / 2.0) * (data_normalizer**2) + diag_data = diag_data.unsqueeze(dim=-1) + + if is_query: + data_dash = ratio * ( + torch.exp( + data_dash + - diag_data + - torch.max(data_dash, dim=-1, keepdim=True).values + ) + + eps + ) + else: + data_dash = ratio * ( + torch.exp(data_dash - diag_data + eps) + ) # - torch.max(data_dash)) + eps) + + return data_dash.type_as(data) + + +def orthogonal_matrix_chunk(cols, qr_uniform_q=False, device=None): + unstructured_block = torch.randn((cols, cols), device=device) + q, r = torch.linalg.qr(unstructured_block.cpu(), mode="reduced") + q, r = map(lambda t: t.to(device), (q, r)) + + # proposed by @Parskatt + # to make sure Q is uniform https://arxiv.org/pdf/math-ph/0609050.pdf + if qr_uniform_q: + d = torch.diag(r, 0) + q *= d.sign() + return q.t() + + +def exists(val): + return val is not None + + +def empty(tensor): + return tensor.numel() == 0 + + +def default(val, d): + return val if exists(val) else d + + +def cast_tuple(val): + return (val,) if not isinstance(val, tuple) else val + + +class PCmer(nn.Module): + """The encoder that is used in the Transformer model.""" + + def __init__( + self, + num_layers, + num_heads, + dim_model, + dim_keys, + dim_values, + residual_dropout, + attention_dropout, + ): + super().__init__() + self.num_layers = num_layers + self.num_heads = num_heads + self.dim_model = dim_model + self.dim_values = dim_values + self.dim_keys = dim_keys + self.residual_dropout = residual_dropout + self.attention_dropout = attention_dropout + + self._layers = nn.ModuleList([_EncoderLayer(self) for _ in range(num_layers)]) + + # METHODS ######################################################################################################## + + def forward(self, phone, mask=None): + + # apply all layers to the input + for i, layer in enumerate(self._layers): + phone = layer(phone, mask) + # provide the final sequence + return phone + + +# ==================================================================================================================== # +# CLASS _ E N C O D E R L A Y E R # +# ==================================================================================================================== # + + +class _EncoderLayer(nn.Module): + """One layer of the encoder. + + Attributes: + attn: (:class:`mha.MultiHeadAttention`): The attention mechanism that is used to read the input sequence. + feed_forward (:class:`ffl.FeedForwardLayer`): The feed-forward layer on top of the attention mechanism. + """ + + def __init__(self, parent: PCmer): + """Creates a new instance of ``_EncoderLayer``. + + Args: + parent (Encoder): The encoder that the layers is created for. + """ + super().__init__() + + self.conformer = ConformerConvModule(parent.dim_model) + self.norm = nn.LayerNorm(parent.dim_model) + self.dropout = nn.Dropout(parent.residual_dropout) + + # selfatt -> fastatt: performer! + self.attn = SelfAttention( + dim=parent.dim_model, heads=parent.num_heads, causal=False + ) + + # METHODS ######################################################################################################## + + def forward(self, phone, mask=None): + + # compute attention sub-layer + phone = phone + (self.attn(self.norm(phone), mask=mask)) + + phone = phone + (self.conformer(phone)) + + return phone + + +def calc_same_padding(kernel_size): + pad = kernel_size // 2 + return (pad, pad - (kernel_size + 1) % 2) + + +# helper classes + + +class Swish(nn.Module): + def forward(self, x): + return x * x.sigmoid() + + +class Transpose(nn.Module): + def __init__(self, dims): + super().__init__() + assert len(dims) == 2, "dims must be a tuple of two dimensions" + self.dims = dims + + def forward(self, x): + return x.transpose(*self.dims) + + +class GLU(nn.Module): + def __init__(self, dim): + super().__init__() + self.dim = dim + + def forward(self, x): + out, gate = x.chunk(2, dim=self.dim) + return out * gate.sigmoid() + + +class DepthWiseConv1d(nn.Module): + def __init__(self, chan_in, chan_out, kernel_size, padding): + super().__init__() + self.padding = padding + self.conv = nn.Conv1d(chan_in, chan_out, kernel_size, groups=chan_in) + + def forward(self, x): + x = F.pad(x, self.padding) + return self.conv(x) + + +class ConformerConvModule(nn.Module): + def __init__( + self, dim, causal=False, expansion_factor=2, kernel_size=31, dropout=0.0 + ): + super().__init__() + + inner_dim = dim * expansion_factor + padding = calc_same_padding(kernel_size) if not causal else (kernel_size - 1, 0) + + self.net = nn.Sequential( + nn.LayerNorm(dim), + Transpose((1, 2)), + nn.Conv1d(dim, inner_dim * 2, 1), + GLU(dim=1), + DepthWiseConv1d( + inner_dim, inner_dim, kernel_size=kernel_size, padding=padding + ), + # nn.BatchNorm1d(inner_dim) if not causal else nn.Identity(), + Swish(), + nn.Conv1d(inner_dim, dim, 1), + Transpose((1, 2)), + nn.Dropout(dropout), + ) + + def forward(self, x): + return self.net(x) + + +def linear_attention(q, k, v): + if v is None: + out = torch.einsum("...ed,...nd->...ne", k, q) + return out + + else: + k_cumsum = k.sum(dim=-2) + # k_cumsum = k.sum(dim = -2) + D_inv = 1.0 / (torch.einsum("...nd,...d->...n", q, k_cumsum.type_as(q)) + 1e-8) + + context = torch.einsum("...nd,...ne->...de", k, v) + out = torch.einsum("...de,...nd,...n->...ne", context, q, D_inv) + return out + + +def gaussian_orthogonal_random_matrix( + nb_rows, nb_columns, scaling=0, qr_uniform_q=False, device=None +): + nb_full_blocks = int(nb_rows / nb_columns) + block_list = [] + + for _ in range(nb_full_blocks): + q = orthogonal_matrix_chunk( + nb_columns, qr_uniform_q=qr_uniform_q, device=device + ) + block_list.append(q) + + remaining_rows = nb_rows - nb_full_blocks * nb_columns + if remaining_rows > 0: + q = orthogonal_matrix_chunk( + nb_columns, qr_uniform_q=qr_uniform_q, device=device + ) + + block_list.append(q[:remaining_rows]) + + final_matrix = torch.cat(block_list) + + if scaling == 0: + multiplier = torch.randn((nb_rows, nb_columns), device=device).norm(dim=1) + elif scaling == 1: + multiplier = math.sqrt((float(nb_columns))) * torch.ones( + (nb_rows,), device=device + ) + else: + raise ValueError(f"Invalid scaling {scaling}") + + return torch.diag(multiplier) @ final_matrix + + +class FastAttention(nn.Module): + def __init__( + self, + dim_heads, + nb_features=None, + ortho_scaling=0, + causal=False, + generalized_attention=False, + kernel_fn=nn.ReLU(), + qr_uniform_q=False, + no_projection=False, + ): + super().__init__() + nb_features = default(nb_features, int(dim_heads * math.log(dim_heads))) + + self.dim_heads = dim_heads + self.nb_features = nb_features + self.ortho_scaling = ortho_scaling + + self.create_projection = partial( + gaussian_orthogonal_random_matrix, + nb_rows=self.nb_features, + nb_columns=dim_heads, + scaling=ortho_scaling, + qr_uniform_q=qr_uniform_q, + ) + projection_matrix = self.create_projection() + self.register_buffer("projection_matrix", projection_matrix) + + self.generalized_attention = generalized_attention + self.kernel_fn = kernel_fn + + # if this is turned on, no projection will be used + # queries and keys will be softmax-ed as in the original efficient attention paper + self.no_projection = no_projection + + self.causal = causal + + @torch.no_grad() + def redraw_projection_matrix(self): + projections = self.create_projection() + self.projection_matrix.copy_(projections) + del projections + + def forward(self, q, k, v): + device = q.device + + if self.no_projection: + q = q.softmax(dim=-1) + k = torch.exp(k) if self.causal else k.softmax(dim=-2) + else: + create_kernel = partial( + softmax_kernel, projection_matrix=self.projection_matrix, device=device + ) + + q = create_kernel(q, is_query=True) + k = create_kernel(k, is_query=False) + + attn_fn = linear_attention if not self.causal else self.causal_linear_fn + if v is None: + out = attn_fn(q, k, None) + return out + else: + out = attn_fn(q, k, v) + return out + + +class SelfAttention(nn.Module): + def __init__( + self, + dim, + causal=False, + heads=8, + dim_head=64, + local_heads=0, + local_window_size=256, + nb_features=None, + feature_redraw_interval=1000, + generalized_attention=False, + kernel_fn=nn.ReLU(), + qr_uniform_q=False, + dropout=0.0, + no_projection=False, + ): + super().__init__() + assert dim % heads == 0, "dimension must be divisible by number of heads" + dim_head = default(dim_head, dim // heads) + inner_dim = dim_head * heads + self.fast_attention = FastAttention( + dim_head, + nb_features, + causal=causal, + generalized_attention=generalized_attention, + kernel_fn=kernel_fn, + qr_uniform_q=qr_uniform_q, + no_projection=no_projection, + ) + + self.heads = heads + self.global_heads = heads - local_heads + self.local_attn = ( + LocalAttention( + window_size=local_window_size, + causal=causal, + autopad=True, + dropout=dropout, + look_forward=int(not causal), + rel_pos_emb_config=(dim_head, local_heads), + ) + if local_heads > 0 + else None + ) + + self.to_q = nn.Linear(dim, inner_dim) + self.to_k = nn.Linear(dim, inner_dim) + self.to_v = nn.Linear(dim, inner_dim) + self.to_out = nn.Linear(inner_dim, dim) + self.dropout = nn.Dropout(dropout) + + @torch.no_grad() + def redraw_projection_matrix(self): + self.fast_attention.redraw_projection_matrix() + + def forward( + self, + x, + context=None, + mask=None, + context_mask=None, + name=None, + inference=False, + **kwargs, + ): + _, _, _, h, gh = *x.shape, self.heads, self.global_heads + + cross_attend = exists(context) + + context = default(context, x) + context_mask = default(context_mask, mask) if not cross_attend else context_mask + q, k, v = self.to_q(x), self.to_k(context), self.to_v(context) + + q, k, v = map(lambda t: rearrange(t, "b n (h d) -> b h n d", h=h), (q, k, v)) + (q, lq), (k, lk), (v, lv) = map(lambda t: (t[:, :gh], t[:, gh:]), (q, k, v)) + + attn_outs = [] + if not empty(q): + if exists(context_mask): + global_mask = context_mask[:, None, :, None] + v.masked_fill_(~global_mask, 0.0) + if cross_attend: + pass + else: + out = self.fast_attention(q, k, v) + attn_outs.append(out) + + if not empty(lq): + assert ( + not cross_attend + ), "local attention is not compatible with cross attention" + out = self.local_attn(lq, lk, lv, input_mask=mask) + attn_outs.append(out) + + out = torch.cat(attn_outs, dim=1) + out = rearrange(out, "b h n d -> b n (h d)") + out = self.to_out(out) + return self.dropout(out) + + +def l2_regularization(model, l2_alpha): + l2_loss = [] + for module in model.modules(): + if type(module) is nn.Conv2d: + l2_loss.append((module.weight**2).sum() / 2.0) + return l2_alpha * sum(l2_loss) + + +class FCPE(nn.Module): + def __init__( + self, + input_channel=128, + out_dims=360, + n_layers=12, + n_chans=512, + use_siren=False, + use_full=False, + loss_mse_scale=10, + loss_l2_regularization=False, + loss_l2_regularization_scale=1, + loss_grad1_mse=False, + loss_grad1_mse_scale=1, + f0_max=1975.5, + f0_min=32.70, + confidence=False, + threshold=0.05, + use_input_conv=True, + ): + super().__init__() + if use_siren is True: + raise ValueError("Siren is not supported yet.") + if use_full is True: + raise ValueError("Full model is not supported yet.") + + self.loss_mse_scale = loss_mse_scale if (loss_mse_scale is not None) else 10 + self.loss_l2_regularization = ( + loss_l2_regularization if (loss_l2_regularization is not None) else False + ) + self.loss_l2_regularization_scale = ( + loss_l2_regularization_scale + if (loss_l2_regularization_scale is not None) + else 1 + ) + self.loss_grad1_mse = loss_grad1_mse if (loss_grad1_mse is not None) else False + self.loss_grad1_mse_scale = ( + loss_grad1_mse_scale if (loss_grad1_mse_scale is not None) else 1 + ) + self.f0_max = f0_max if (f0_max is not None) else 1975.5 + self.f0_min = f0_min if (f0_min is not None) else 32.70 + self.confidence = confidence if (confidence is not None) else False + self.threshold = threshold if (threshold is not None) else 0.05 + self.use_input_conv = use_input_conv if (use_input_conv is not None) else True + + self.cent_table_b = torch.Tensor( + np.linspace( + self.f0_to_cent(torch.Tensor([f0_min]))[0], + self.f0_to_cent(torch.Tensor([f0_max]))[0], + out_dims, + ) + ) + self.register_buffer("cent_table", self.cent_table_b) + + # conv in stack + _leaky = nn.LeakyReLU() + self.stack = nn.Sequential( + nn.Conv1d(input_channel, n_chans, 3, 1, 1), + nn.GroupNorm(4, n_chans), + _leaky, + nn.Conv1d(n_chans, n_chans, 3, 1, 1), + ) + + # transformer + self.decoder = PCmer( + num_layers=n_layers, + num_heads=8, + dim_model=n_chans, + dim_keys=n_chans, + dim_values=n_chans, + residual_dropout=0.1, + attention_dropout=0.1, + ) + self.norm = nn.LayerNorm(n_chans) + + # out + self.n_out = out_dims + self.dense_out = weight_norm(nn.Linear(n_chans, self.n_out)) + + def forward( + self, mel, infer=True, gt_f0=None, return_hz_f0=False, cdecoder="local_argmax" + ): + """ + input: + B x n_frames x n_unit + return: + dict of B x n_frames x feat + """ + if cdecoder == "argmax": + self.cdecoder = self.cents_decoder + elif cdecoder == "local_argmax": + self.cdecoder = self.cents_local_decoder + if self.use_input_conv: + x = self.stack(mel.transpose(1, 2)).transpose(1, 2) + else: + x = mel + x = self.decoder(x) + x = self.norm(x) + x = self.dense_out(x) # [B,N,D] + x = torch.sigmoid(x) + if not infer: + gt_cent_f0 = self.f0_to_cent(gt_f0) # mel f0 #[B,N,1] + gt_cent_f0 = self.gaussian_blurred_cent(gt_cent_f0) # #[B,N,out_dim] + loss_all = self.loss_mse_scale * F.binary_cross_entropy( + x, gt_cent_f0 + ) # bce loss + # l2 regularization + if self.loss_l2_regularization: + loss_all = loss_all + l2_regularization( + model=self, l2_alpha=self.loss_l2_regularization_scale + ) + x = loss_all + if infer: + x = self.cdecoder(x) + x = self.cent_to_f0(x) + if not return_hz_f0: + x = (1 + x / 700).log() + return x + + def cents_decoder(self, y, mask=True): + B, N, _ = y.size() + ci = self.cent_table[None, None, :].expand(B, N, -1) + rtn = torch.sum(ci * y, dim=-1, keepdim=True) / torch.sum( + y, dim=-1, keepdim=True + ) # cents: [B,N,1] + if mask: + confident = torch.max(y, dim=-1, keepdim=True)[0] + confident_mask = torch.ones_like(confident) + confident_mask[confident <= self.threshold] = float("-INF") + rtn = rtn * confident_mask + if self.confidence: + return rtn, confident + else: + return rtn + + def cents_local_decoder(self, y, mask=True): + B, N, _ = y.size() + ci = self.cent_table[None, None, :].expand(B, N, -1) + confident, max_index = torch.max(y, dim=-1, keepdim=True) + local_argmax_index = torch.arange(0, 9).to(max_index.device) + (max_index - 4) + local_argmax_index[local_argmax_index < 0] = 0 + local_argmax_index[local_argmax_index >= self.n_out] = self.n_out - 1 + ci_l = torch.gather(ci, -1, local_argmax_index) + y_l = torch.gather(y, -1, local_argmax_index) + rtn = torch.sum(ci_l * y_l, dim=-1, keepdim=True) / torch.sum( + y_l, dim=-1, keepdim=True + ) # cents: [B,N,1] + if mask: + confident_mask = torch.ones_like(confident) + confident_mask[confident <= self.threshold] = float("-INF") + rtn = rtn * confident_mask + if self.confidence: + return rtn, confident + else: + return rtn + + def cent_to_f0(self, cent): + return 10.0 * 2 ** (cent / 1200.0) + + def f0_to_cent(self, f0): + return 1200.0 * torch.log2(f0 / 10.0) + + def gaussian_blurred_cent(self, cents): # cents: [B,N,1] + mask = (cents > 0.1) & (cents < (1200.0 * np.log2(self.f0_max / 10.0))) + B, N, _ = cents.size() + ci = self.cent_table[None, None, :].expand(B, N, -1) + return torch.exp(-torch.square(ci - cents) / 1250) * mask.float() + + +class FCPEInfer: + def __init__(self, model_path, device=None, dtype=torch.float32): + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + self.device = device + ckpt = torch.load(model_path, map_location=torch.device(self.device)) + self.args = DotDict(ckpt["config"]) + self.dtype = dtype + model = FCPE( + input_channel=self.args.model.input_channel, + out_dims=self.args.model.out_dims, + n_layers=self.args.model.n_layers, + n_chans=self.args.model.n_chans, + use_siren=self.args.model.use_siren, + use_full=self.args.model.use_full, + loss_mse_scale=self.args.loss.loss_mse_scale, + loss_l2_regularization=self.args.loss.loss_l2_regularization, + loss_l2_regularization_scale=self.args.loss.loss_l2_regularization_scale, + loss_grad1_mse=self.args.loss.loss_grad1_mse, + loss_grad1_mse_scale=self.args.loss.loss_grad1_mse_scale, + f0_max=self.args.model.f0_max, + f0_min=self.args.model.f0_min, + confidence=self.args.model.confidence, + ) + model.to(self.device).to(self.dtype) + model.load_state_dict(ckpt["model"]) + model.eval() + self.model = model + self.wav2mel = Wav2Mel(self.args, dtype=self.dtype, device=self.device) + + @torch.no_grad() + def __call__(self, audio, sr, threshold=0.05): + self.model.threshold = threshold + audio = audio[None, :] + mel = self.wav2mel(audio=audio, sample_rate=sr).to(self.dtype) + f0 = self.model(mel=mel, infer=True, return_hz_f0=True) + return f0 + + +class Wav2Mel: + + def __init__(self, args, device=None, dtype=torch.float32): + # self.args = args + self.sampling_rate = args.mel.sampling_rate + self.hop_size = args.mel.hop_size + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + self.device = device + self.dtype = dtype + self.stft = STFT( + args.mel.sampling_rate, + args.mel.num_mels, + args.mel.n_fft, + args.mel.win_size, + args.mel.hop_size, + args.mel.fmin, + args.mel.fmax, + ) + self.resample_kernel = {} + + def extract_nvstft(self, audio, keyshift=0, train=False): + mel = self.stft.get_mel(audio, keyshift=keyshift, train=train).transpose( + 1, 2 + ) # B, n_frames, bins + return mel + + def extract_mel(self, audio, sample_rate, keyshift=0, train=False): + audio = audio.to(self.dtype).to(self.device) + # resample + if sample_rate == self.sampling_rate: + audio_res = audio + else: + key_str = str(sample_rate) + if key_str not in self.resample_kernel: + self.resample_kernel[key_str] = Resample( + sample_rate, self.sampling_rate, lowpass_filter_width=128 + ) + self.resample_kernel[key_str] = ( + self.resample_kernel[key_str].to(self.dtype).to(self.device) + ) + audio_res = self.resample_kernel[key_str](audio) + + # extract + mel = self.extract_nvstft( + audio_res, keyshift=keyshift, train=train + ) # B, n_frames, bins + n_frames = int(audio.shape[1] // self.hop_size) + 1 + if n_frames > int(mel.shape[1]): + mel = torch.cat((mel, mel[:, -1:, :]), 1) + if n_frames < int(mel.shape[1]): + mel = mel[:, :n_frames, :] + return mel + + def __call__(self, audio, sample_rate, keyshift=0, train=False): + return self.extract_mel(audio, sample_rate, keyshift=keyshift, train=train) + + +class DotDict(dict): + def __getattr__(*args): + val = dict.get(*args) + return DotDict(val) if type(val) is dict else val + + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ + + +class F0Predictor(object): + def compute_f0(self, wav, p_len): + """ + input: wav:[signal_length] + p_len:int + output: f0:[signal_length//hop_length] + """ + pass + + def compute_f0_uv(self, wav, p_len): + """ + input: wav:[signal_length] + p_len:int + output: f0:[signal_length//hop_length],uv:[signal_length//hop_length] + """ + pass + + +class FCPEF0Predictor(F0Predictor): + def __init__( + self, + model_path, + hop_length=512, + f0_min=50, + f0_max=1100, + dtype=torch.float32, + device=None, + sampling_rate=44100, + threshold=0.05, + ): + self.fcpe = FCPEInfer(model_path, device=device, dtype=dtype) + self.hop_length = hop_length + self.f0_min = f0_min + self.f0_max = f0_max + if device is None: + self.device = "cuda" if torch.cuda.is_available() else "cpu" + else: + self.device = device + self.threshold = threshold + self.sampling_rate = sampling_rate + self.dtype = dtype + self.name = "fcpe" + + def repeat_expand( + self, + content: Union[torch.Tensor, np.ndarray], + target_len: int, + mode: str = "nearest", + ): + ndim = content.ndim + + if content.ndim == 1: + content = content[None, None] + elif content.ndim == 2: + content = content[None] + + assert content.ndim == 3 + + is_np = isinstance(content, np.ndarray) + if is_np: + content = torch.from_numpy(content) + + results = torch.nn.functional.interpolate(content, size=target_len, mode=mode) + + if is_np: + results = results.numpy() + + if ndim == 1: + return results[0, 0] + elif ndim == 2: + return results[0] + + def post_process(self, x, sampling_rate, f0, pad_to): + if isinstance(f0, np.ndarray): + f0 = torch.from_numpy(f0).float().to(x.device) + + if pad_to is None: + return f0 + + f0 = self.repeat_expand(f0, pad_to) + + vuv_vector = torch.zeros_like(f0) + vuv_vector[f0 > 0.0] = 1.0 + vuv_vector[f0 <= 0.0] = 0.0 + + # 去掉0频率, 并线性插值 + nzindex = torch.nonzero(f0).squeeze() + f0 = torch.index_select(f0, dim=0, index=nzindex).cpu().numpy() + time_org = self.hop_length / sampling_rate * nzindex.cpu().numpy() + time_frame = np.arange(pad_to) * self.hop_length / sampling_rate + + vuv_vector = F.interpolate(vuv_vector[None, None, :], size=pad_to)[0][0] + + if f0.shape[0] <= 0: + return ( + torch.zeros(pad_to, dtype=torch.float, device=x.device).cpu().numpy(), + vuv_vector.cpu().numpy(), + ) + if f0.shape[0] == 1: + return ( + torch.ones(pad_to, dtype=torch.float, device=x.device) * f0[0] + ).cpu().numpy(), vuv_vector.cpu().numpy() + + # 大概可以用 torch 重写? + f0 = np.interp(time_frame, time_org, f0, left=f0[0], right=f0[-1]) + # vuv_vector = np.ceil(scipy.ndimage.zoom(vuv_vector,pad_to/len(vuv_vector),order = 0)) + + return f0, vuv_vector.cpu().numpy() + + def compute_f0(self, wav, p_len=None): + x = torch.FloatTensor(wav).to(self.dtype).to(self.device) + if p_len is None: + print("fcpe p_len is None") + p_len = x.shape[0] // self.hop_length + f0 = self.fcpe(x, sr=self.sampling_rate, threshold=self.threshold)[0, :, 0] + if torch.all(f0 == 0): + rtn = f0.cpu().numpy() if p_len is None else np.zeros(p_len) + return rtn, rtn + return self.post_process(x, self.sampling_rate, f0, p_len)[0] + + def compute_f0_uv(self, wav, p_len=None): + x = torch.FloatTensor(wav).to(self.dtype).to(self.device) + if p_len is None: + p_len = x.shape[0] // self.hop_length + f0 = self.fcpe(x, sr=self.sampling_rate, threshold=self.threshold)[0, :, 0] + if torch.all(f0 == 0): + rtn = f0.cpu().numpy() if p_len is None else np.zeros(p_len) + return rtn, rtn + return self.post_process(x, self.sampling_rate, f0, p_len) diff --git a/rmvpe.py b/predictor/rmvpe.py similarity index 85% rename from rmvpe.py rename to predictor/rmvpe.py index 3ad34614..063e6c02 100644 --- a/rmvpe.py +++ b/predictor/rmvpe.py @@ -1,7 +1,7 @@ -import sys, torch, numpy as np, traceback, pdb import torch.nn as nn -from time import time as ttime +import torch, numpy as np import torch.nn.functional as F +from librosa.filters import mel class BiGRU(nn.Module): @@ -245,10 +245,6 @@ def __init__( nn.Dropout(0.25), nn.Sigmoid(), ) - else: - self.fc = nn.Sequential( - nn.Linear(3 * N_MELS, N_CLASS), nn.Dropout(0.25), nn.Sigmoid() - ) def forward(self, mel): mel = mel.transpose(-1, -2).unsqueeze(1) @@ -257,9 +253,6 @@ def forward(self, mel): return x -from librosa.filters import mel - - class MelSpectrogram(torch.nn.Module): def __init__( self, @@ -362,33 +355,21 @@ def decode(self, hidden, thred=0.03): cents_pred = self.to_local_average_cents(hidden, thred=thred) f0 = 10 * (2 ** (cents_pred / 1200)) f0[f0 == 10] = 0 - # f0 = np.array([10 * (2 ** (cent_pred / 1200)) if cent_pred else 0 for cent_pred in cents_pred]) return f0 def infer_from_audio(self, audio, thred=0.03): audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0) - # torch.cuda.synchronize() - # t0=ttime() mel = self.mel_extractor(audio, center=True) - # torch.cuda.synchronize() - # t1=ttime() hidden = self.mel2hidden(mel) - # torch.cuda.synchronize() - # t2=ttime() hidden = hidden.squeeze(0).cpu().numpy() if self.is_half == True: hidden = hidden.astype("float32") f0 = self.decode(hidden, thred=thred) - # torch.cuda.synchronize() - # t3=ttime() - # print("hmvpe:%s\t%s\t%s\t%s"%(t1-t0,t2-t1,t3-t2,t3-t0)) return f0 def to_local_average_cents(self, salience, thred=0.05): - # t0 = ttime() - center = np.argmax(salience, axis=1) # 帧长#index - salience = np.pad(salience, ((0, 0), (4, 4))) # 帧长,368 - # t1 = ttime() + center = np.argmax(salience, axis=1) + salience = np.pad(salience, ((0, 0), (4, 4))) center += 4 todo_salience = [] todo_cents_mapping = [] @@ -397,36 +378,22 @@ def to_local_average_cents(self, salience, thred=0.05): for idx in range(salience.shape[0]): todo_salience.append(salience[:, starts[idx] : ends[idx]][idx]) todo_cents_mapping.append(self.cents_mapping[starts[idx] : ends[idx]]) - # t2 = ttime() - todo_salience = np.array(todo_salience) # 帧长,9 - todo_cents_mapping = np.array(todo_cents_mapping) # 帧长,9 + todo_salience = np.array(todo_salience) + todo_cents_mapping = np.array(todo_cents_mapping) product_sum = np.sum(todo_salience * todo_cents_mapping, 1) - weight_sum = np.sum(todo_salience, 1) # 帧长 - devided = product_sum / weight_sum # 帧长 - # t3 = ttime() - maxx = np.max(salience, axis=1) # 帧长 + weight_sum = np.sum(todo_salience, 1) + devided = product_sum / weight_sum + maxx = np.max(salience, axis=1) devided[maxx <= thred] = 0 - # t4 = ttime() - # print("decode:%s\t%s\t%s\t%s" % (t1 - t0, t2 - t1, t3 - t2, t4 - t3)) return devided - -# if __name__ == '__main__': -# audio, sampling_rate = sf.read("卢本伟语录~1.wav") -# if len(audio.shape) > 1: -# audio = librosa.to_mono(audio.transpose(1, 0)) -# audio_bak = audio.copy() -# if sampling_rate != 16000: -# audio = librosa.resample(audio, orig_sr=sampling_rate, target_sr=16000) -# model_path = "/bili-coeus/jupyter/jupyterhub-liujing04/vits_ch/test-RMVPE/weights/rmvpe_llc_half.pt" -# thred = 0.03 # 0.01 -# device = 'cuda' if torch.cuda.is_available() else 'cpu' -# rmvpe = RMVPE(model_path,is_half=False, device=device) -# t0=ttime() -# f0 = rmvpe.infer_from_audio(audio, thred=thred) -# f0 = rmvpe.infer_from_audio(audio, thred=thred) -# f0 = rmvpe.infer_from_audio(audio, thred=thred) -# f0 = rmvpe.infer_from_audio(audio, thred=thred) -# f0 = rmvpe.infer_from_audio(audio, thred=thred) -# t1=ttime() -# print(f0.shape,t1-t0) + def infer_from_audio_with_pitch(self, audio, thred=0.03, f0_min=50, f0_max=1100): + audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0) + mel = self.mel_extractor(audio, center=True) + hidden = self.mel2hidden(mel) + hidden = hidden.squeeze(0).cpu().numpy() + if self.is_half == True: + hidden = hidden.astype("float32") + f0 = self.decode(hidden, thred=thred) + f0[(f0 < f0_min) | (f0 > f0_max)] = 0 + return f0 diff --git a/requirements.txt b/requirements.txt index adc8f169..8a0cdc89 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,21 +1,37 @@ +# Build tools wheel setuptools + +# Audio processing ffmpeg +pyworld +soundfile +praat-parselmouth +torchcrepe +audio-separator[gpu]==0.28.5 + +# Libraries for machine learning and AI numba fairseq faiss-cpu +torch +einops +local-attention + +# Web and async gradio==3.40.0 -pyworld -soundfile -praat-parselmouth httpx -tensorboardX -torchcrepe asyncio + +# Text-to-speech edge-tts + +# Media handling yt_dlp rarfile mega.py gdown -audio-separator[gpu]==0.28.5 + +# Data visualization and logging +tensorboardX aria2 diff --git a/vc_infer_pipeline.py b/vc_infer_pipeline.py index 82c15f59..05025db5 100644 --- a/vc_infer_pipeline.py +++ b/vc_infer_pipeline.py @@ -1,14 +1,22 @@ +from functools import lru_cache import numpy as np, parselmouth, torch, pdb, sys, os from time import time as ttime import torch.nn.functional as F -import scipy.signal as signal -import pyworld, os, traceback, faiss, librosa, torchcrepe +import torchcrepe from scipy import signal -from functools import lru_cache +from torch import Tensor +import pyworld, os, faiss, librosa, torchcrepe +import random +import gc +import re -now_dir = os.getcwd() +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +now_dir = os.path.join(BASE_DIR) sys.path.append(now_dir) +from predictor.fcpe import FCPEF0Predictor +from predictor.rmvpe import RMVPE + bh, ah = signal.butter(N=5, Wn=48, btype="high", fs=16000) input_audio_path2wav = {} @@ -28,21 +36,22 @@ def cache_harvest_f0(input_audio_path, fs, f0max, f0min, frame_period): return f0 -def change_rms(data1, sr1, data2, sr2, rate): # 1是输入音频,2是输出音频,rate是2的占比 +def change_rms(data1, sr1, data2, sr2, rate): # print(data1.max(),data2.max()) - rms1 = librosa.feature.rms( - y=data1, frame_length=sr1 // 2 * 2, hop_length=sr1 // 2 - ) # 每半秒一个点 + rms1 = librosa.feature.rms(y=data1, frame_length=sr1 // 2 * 2, hop_length=sr1 // 2) rms2 = librosa.feature.rms(y=data2, frame_length=sr2 // 2 * 2, hop_length=sr2 // 2) + rms1 = torch.from_numpy(rms1) rms1 = F.interpolate( rms1.unsqueeze(0), size=data2.shape[0], mode="linear" ).squeeze() + rms2 = torch.from_numpy(rms2) rms2 = F.interpolate( rms2.unsqueeze(0), size=data2.shape[0], mode="linear" ).squeeze() rms2 = torch.max(rms2, torch.zeros_like(rms2) + 1e-6) + data2 *= ( torch.pow(rms1, torch.tensor(1 - rate)) * torch.pow(rms2, torch.tensor(rate - 1)) @@ -59,15 +68,178 @@ def __init__(self, tgt_sr, config): config.x_max, config.is_half, ) - self.sr = 16000 # hubert输入采样率 - self.window = 160 # 每帧点数 - self.t_pad = self.sr * self.x_pad # 每条前后pad时间 + self.sr = 16000 + self.window = 160 + self.t_pad = self.sr * self.x_pad self.t_pad_tgt = tgt_sr * self.x_pad self.t_pad2 = self.t_pad * 2 - self.t_query = self.sr * self.x_query # 查询切点前后查询时间 - self.t_center = self.sr * self.x_center # 查询切点位置 - self.t_max = self.sr * self.x_max # 免查询时长阈值 + self.t_query = self.sr * self.x_query + self.t_center = self.sr * self.x_center + self.t_max = self.sr * self.x_max self.device = config.device + + self.ref_freqs = [ + 65.41, + 82.41, + 110.00, + 146.83, + 196.00, + 246.94, + 329.63, + 440.00, + 587.33, + 783.99, + 1046.50, + ] + self.note_dict = self.generate_interpolated_frequencies() + + def generate_interpolated_frequencies(self): + note_dict = [] + for i in range(len(self.ref_freqs) - 1): + freq_low = self.ref_freqs[i] + freq_high = self.ref_freqs[i + 1] + interpolated_freqs = np.linspace( + freq_low, freq_high, num=10, endpoint=False + ) + note_dict.extend(interpolated_freqs) + note_dict.append(self.ref_freqs[-1]) + return note_dict + + def autotune_f0(self, f0): + autotuned_f0 = np.zeros_like(f0) + for i, freq in enumerate(f0): + closest_note = min(self.note_dict, key=lambda x: abs(x - freq)) + autotuned_f0[i] = closest_note + return autotuned_f0 + + def get_optimal_torch_device(self, index: int = 0) -> torch.device: + if torch.cuda.is_available(): + return torch.device(f"cuda:{index % torch.cuda.device_count()}") + elif torch.backends.mps.is_available(): + return torch.device("mps") + return torch.device("cpu") + + def get_f0_crepe_computation( + self, + x, + f0_min, + f0_max, + p_len, + hop_length=160, + model="full", + ): + x = x.astype(np.float32) + x /= np.quantile(np.abs(x), 0.999) + torch_device = self.get_optimal_torch_device() + audio = torch.from_numpy(x).to(torch_device, copy=True) + audio = torch.unsqueeze(audio, dim=0) + if audio.ndim == 2 and audio.shape[0] > 1: + audio = torch.mean(audio, dim=0, keepdim=True).detach() + audio = audio.detach() + pitch: Tensor = torchcrepe.predict( + audio, + self.sr, + hop_length, + f0_min, + f0_max, + model, + batch_size=hop_length * 2, + device=torch_device, + pad=True, + ) + p_len = p_len or x.shape[0] // hop_length + source = np.array(pitch.squeeze(0).cpu().float().numpy()) + source[source < 0.001] = np.nan + target = np.interp( + np.arange(0, len(source) * p_len, len(source)) / p_len, + np.arange(0, len(source)), + source, + ) + f0 = np.nan_to_num(target) + return f0 + + def get_f0_official_crepe_computation( + self, + x, + f0_min, + f0_max, + model="full", + ): + batch_size = 512 + audio = torch.tensor(np.copy(x))[None].float() + f0, pd = torchcrepe.predict( + audio, + self.sr, + self.window, + f0_min, + f0_max, + model, + batch_size=batch_size, + device=self.device, + return_periodicity=True, + ) + pd = torchcrepe.filter.median(pd, 3) + f0 = torchcrepe.filter.mean(f0, 3) + f0[pd < 0.1] = 0 + f0 = f0[0].cpu().numpy() + return f0 + + def get_f0_hybrid_computation( + self, + methods_str, + input_audio_path, + x, + f0_min, + f0_max, + p_len, + filter_radius, + crepe_hop_length, + time_step, + ): + methods_str = re.search("hybrid\[(.+)\]", methods_str) + if methods_str: + methods = [method.strip() for method in methods_str.group(1).split("+")] + f0_computation_stack = [] + print(f"Calculating f0 pitch estimations for methods {str(methods)}") + x = x.astype(np.float32) + x /= np.quantile(np.abs(x), 0.999) + for method in methods: + f0 = None + if method == "mangio-crepe": + f0 = self.get_f0_crepe_computation( + x, f0_min, f0_max, p_len, crepe_hop_length + ) + elif method == "rmvpe": + if hasattr(self, "model_rmvpe") == False: + + self.model_rmvpe = RMVPE( + os.path.join(BASE_DIR, 'rmvpe.pt'), is_half=self.is_half, device=self.device + ) + f0 = self.model_rmvpe.infer_from_audio(x, thred=0.03) + f0 = f0[1:] + elif method == "fcpe": + self.model_fcpe = FCPEF0Predictor( + os.path.join(BASE_DIR, 'fcpe.pt'), + f0_min=int(f0_min), + f0_max=int(f0_max), + dtype=torch.float32, + device=self.device, + sampling_rate=self.sr, + threshold=0.03, + ) + f0 = self.model_fcpe.compute_f0(x, p_len=p_len) + del self.model_fcpe + gc.collect() + f0_computation_stack.append(f0) + + print(f"Calculating hybrid median f0 from the stack of {str(methods)}") + f0_computation_stack = [fc for fc in f0_computation_stack if fc is not None] + f0_median_hybrid = None + if len(f0_computation_stack) == 1: + f0_median_hybrid = f0_computation_stack[0] + else: + f0_median_hybrid = np.nanmedian(f0_computation_stack, axis=0) + return f0_median_hybrid def get_f0( self, @@ -77,12 +249,16 @@ def get_f0( f0_up_key, f0_method, filter_radius, + crepe_hop_length, + f0autotune, inp_f0=None, + f0_min=50, + f0_max=1100, ): global input_audio_path2wav time_step = self.window / self.sr * 1000 - f0_min = 50 - f0_max = 1100 + #f0_min = 50 + #f0_max = 1100 f0_mel_min = 1127 * np.log(1 + f0_min / 700) f0_mel_max = 1127 * np.log(1 + f0_max / 700) if f0_method == "pm": @@ -98,47 +274,77 @@ def get_f0( ) pad_size = (p_len - len(f0) + 1) // 2 if pad_size > 0 or p_len - len(f0) - pad_size > 0: - f0 = np.pad( - f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant" - ) + f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant") + elif f0_method == "harvest": input_audio_path2wav[input_audio_path] = x.astype(np.double) f0 = cache_harvest_f0(input_audio_path, self.sr, f0_max, f0_min, 10) - if filter_radius > 2: + if int(filter_radius) > 2: f0 = signal.medfilt(f0, 3) - elif f0_method == "crepe": - model = "full" - # Pick a batch size that doesn't cause memory errors on your gpu - batch_size = 512 - # Compute pitch using first gpu - audio = torch.tensor(np.copy(x))[None].float() - f0, pd = torchcrepe.predict( - audio, - self.sr, - self.window, - f0_min, - f0_max, - model, - batch_size=batch_size, - device=self.device, - return_periodicity=True, + + elif f0_method == "dio": + f0, t = pyworld.dio( + x.astype(np.double), + fs=self.sr, + f0_ceil=f0_max, + f0_floor=f0_min, + frame_period=10, ) - pd = torchcrepe.filter.median(pd, 3) - f0 = torchcrepe.filter.mean(f0, 3) - f0[pd < 0.1] = 0 - f0 = f0[0].cpu().numpy() + f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sr) + f0 = signal.medfilt(f0, 3) + + elif f0_method == "mangio-crepe": + f0 = self.get_f0_crepe_computation(x, f0_min, f0_max, p_len, crepe_hop_length) + elif f0_method == "rmvpe": if hasattr(self, "model_rmvpe") == False: - from rmvpe import RMVPE - print("loading rmvpe model") self.model_rmvpe = RMVPE( - "rmvpe.pt", is_half=self.is_half, device=self.device + os.path.join(BASE_DIR, 'rvc_models', 'rmvpe.pt'), is_half=self.is_half, device=self.device ) f0 = self.model_rmvpe.infer_from_audio(x, thred=0.03) + + elif f0_method == "rmvpe+": + params = {'x': x, 'p_len': p_len, 'f0_up_key': f0_up_key, 'f0_min': f0_min, + 'f0_max': f0_max, 'time_step': time_step, 'filter_radius': filter_radius, + 'crepe_hop_length': crepe_hop_length, 'model': "full" + } + f0 = self.get_pitch_dependant_rmvpe(**params) + + elif f0_method == "fcpe": + self.model_fcpe = FCPEF0Predictor( + os.path.join(BASE_DIR, 'fcpe.pt'), + f0_min=int(f0_min), + f0_max=int(f0_max), + dtype=torch.float32, + device=self.device, + sampling_rate=self.sr, + threshold=0.03, + ) + f0 = self.model_fcpe.compute_f0(x, p_len=p_len) + del self.model_fcpe + gc.collect() + + elif "hybrid" in f0_method: + input_audio_path2wav[input_audio_path] = x.astype(np.double) + f0 = self.get_f0_hybrid_computation( + f0_method, + input_audio_path, + x, + f0_min, + f0_max, + p_len, + filter_radius, + crepe_hop_length, + time_step, + ) + + print("f0_autotune =", f0autotune) + if f0autotune == "True": + f0 = self.autotune_f0(f0) + f0 *= pow(2, f0_up_key / 12) - # with open("test.txt","w")as f:f.write("\n".join([str(i)for i in f0.tolist()])) - tf0 = self.sr // self.window # 每秒f0点数 + tf0 = self.sr // self.window if inp_f0 is not None: delta_t = np.round( (inp_f0[:, 0].max() - inp_f0[:, 0].min()) * tf0 + 1 @@ -150,7 +356,6 @@ def get_f0( f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)] = replace_f0[ :shape ] - # with open("test_opt.txt","w")as f:f.write("\n".join([str(i)for i in f0.tolist()])) f0bak = f0.copy() f0_mel = 1127 * np.log(1 + f0 / 700) f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / ( @@ -158,9 +363,24 @@ def get_f0( ) + 1 f0_mel[f0_mel <= 1] = 1 f0_mel[f0_mel > 255] = 255 - f0_coarse = np.rint(f0_mel).astype(np.int) - return f0_coarse, f0bak # 1-0 + f0_coarse = np.rint(f0_mel).astype(np.int_) + + return f0_coarse, f0bak + + def get_pitch_dependant_rmvpe(self, x, f0_min=1, f0_max=40000, *args, **kwargs): + if not hasattr(self, "model_rmvpe"): + + self.model_rmvpe = RMVPE( + os.path.join(BASE_DIR, 'rmvpe.pt'), + is_half=self.is_half, + device=self.device, + ) + + f0 = self.model_rmvpe.infer_from_audio_with_pitch(x, thred=0.03, f0_min=f0_min, f0_max=f0_max) + + return f0 + def vc( self, model, @@ -175,13 +395,13 @@ def vc( index_rate, version, protect, - ): # ,file_index,file_big_npy + ): feats = torch.from_numpy(audio0) if self.is_half: feats = feats.half() else: feats = feats.float() - if feats.dim() == 2: # double channels + if feats.dim() == 2: feats = feats.mean(-1) assert feats.dim() == 1, feats.dim() feats = feats.view(1, -1) @@ -207,9 +427,6 @@ def vc( if self.is_half: npy = npy.astype("float32") - # _, I = index.search(npy, 1) - # npy = big_npy[I.squeeze()] - score, ix = index.search(npy, k=8) weight = np.square(1 / score) weight /= weight.sum(axis=1, keepdims=True) @@ -274,7 +491,6 @@ def pipeline( f0_up_key, f0_method, file_index, - # file_big_npy, index_rate, if_f0, filter_radius, @@ -283,21 +499,18 @@ def pipeline( rms_mix_rate, version, protect, + crepe_hop_length, + f0autotune, f0_file=None, + f0_min=50, + f0_max=1100, ): - if ( - file_index != "" - # and file_big_npy != "" - # and os.path.exists(file_big_npy) == True - and os.path.exists(file_index) == True - and index_rate != 0 - ): + if file_index != "" and os.path.exists(file_index) == True and index_rate != 0: try: index = faiss.read_index(file_index) - # big_npy = np.load(file_big_npy) big_npy = index.reconstruct_n(0, index.ntotal) - except: - traceback.print_exc() + except Exception as error: + print(error) index = big_npy = None else: index = big_npy = None @@ -332,8 +545,8 @@ def pipeline( for line in lines: inp_f0.append([float(i) for i in line.split(",")]) inp_f0 = np.array(inp_f0, dtype="float32") - except: - traceback.print_exc() + except Exception as error: + print(error) sid = torch.tensor(sid, device=self.device).unsqueeze(0).long() pitch, pitchf = None, None if if_f0 == 1: @@ -344,7 +557,11 @@ def pipeline( f0_up_key, f0_method, filter_radius, + crepe_hop_length, + f0autotune, inp_f0, + f0_min, + f0_max, ) pitch = pitch[:p_len] pitchf = pitchf[:p_len]