|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from typing import TYPE_CHECKING, Any, Dict, List |
|
|
|
from ..integrations import prepare_for_hqq_linear |
|
from ..utils import is_accelerate_available, is_hqq_available, is_torch_available, logging |
|
from .base import HfQuantizer |
|
from .quantizers_utils import get_module_from_name |
|
|
|
|
|
if TYPE_CHECKING: |
|
from ..modeling_utils import PreTrainedModel |
|
|
|
|
|
if is_accelerate_available(): |
|
from accelerate.hooks import remove_hook_from_module |
|
|
|
if is_torch_available(): |
|
import torch |
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
|
|
def find_parent(model, name): |
|
module_tree = name.split(".")[:-1] |
|
parent = model |
|
for m in module_tree: |
|
parent = parent._modules[m] |
|
return parent |
|
|
|
|
|
class HqqHfQuantizer(HfQuantizer): |
|
""" |
|
HQQ quantizer base HF class. |
|
nn.Linear modules are first tagged with quant_config in _process_model_before_weight_loading(). |
|
The actual quantization and offloading to the GPU is done in check_quantized_param(). |
|
""" |
|
|
|
use_keep_in_fp32_modules = False |
|
requires_parameters_quantization = True |
|
requires_calibration = False |
|
required_packages = ["hqq"] |
|
|
|
def __init__(self, quantization_config, **kwargs): |
|
super().__init__(quantization_config, **kwargs) |
|
self.torch_dtype = None |
|
self.using_multi_gpu = False |
|
|
|
def validate_environment(self, *args, **kwargs): |
|
if not (is_hqq_available()): |
|
raise ImportError( |
|
"A valid HQQ version (>=0.2.1) is not available. Please follow the instructions to install it: `https://github.com/mobiusml/hqq/`." |
|
) |
|
|
|
if kwargs.get("from_tf", False) or kwargs.get("from_flax", False): |
|
raise ValueError( |
|
"Converting weights from tf/flax weights is currently not supported, please make" |
|
" sure the weights are in PyTorch format." |
|
) |
|
|
|
if not torch.cuda.is_available(): |
|
raise RuntimeError("No GPU found. A GPU is needed for quantization.") |
|
|
|
if self.torch_dtype is None: |
|
if "torch_dtype" in kwargs: |
|
self.torch_dtype = kwargs["torch_dtype"] |
|
else: |
|
self.torch_dtype = torch.float32 |
|
logger.info("Setting torch_dtype to torch.float32 as the default value since it was not specified.") |
|
|
|
device_map = kwargs.get("device_map", None) |
|
if isinstance(device_map, dict): |
|
if "cpu" in device_map.values() or "disk" in device_map.values(): |
|
raise ValueError( |
|
"You are attempting to use an HQQ model with a device_map that contains a CPU or disk device." |
|
" This is not supported. Please remove the CPU or disk device from the device_map." |
|
) |
|
else: |
|
self.using_multi_gpu = len(set(device_map.values())) > 1 |
|
|
|
def update_missing_keys( |
|
self, model: "PreTrainedModel", missing_keys: List[str], prefix: str, **kwargs |
|
) -> List[str]: |
|
if self.pre_quantized: |
|
return [key for key in missing_keys if ("weight" not in key)] |
|
else: |
|
return missing_keys |
|
|
|
|
|
def update_expected_keys( |
|
self, model: "PreTrainedModel", expected_keys: List[str], loaded_keys: List[str] |
|
) -> List[str]: |
|
if not self.pre_quantized: |
|
return expected_keys |
|
|
|
|
|
def _find_hqq_quantizable_layers(model, layers): |
|
for name, module in model.named_children(): |
|
if isinstance(module, (torch.nn.Linear)): |
|
layers.add(module.name) |
|
_find_hqq_quantizable_layers(module, layers) |
|
|
|
new_keys = set(expected_keys) |
|
if is_hqq_available(): |
|
from hqq.core.quantize import HQQLinear |
|
|
|
|
|
for name, module in model.named_modules(): |
|
module.name = name |
|
|
|
|
|
_valid_modules = set() |
|
_find_hqq_quantizable_layers(model, _valid_modules) |
|
_valid_modules -= set(model.config.quantization_config["skip_modules"]) |
|
|
|
|
|
_ref_keys = HQQLinear( |
|
linear_layer=None, quant_config=None, compute_dtype=torch.float16, device="cpu" |
|
).state_dict_keys() - {"bias"} |
|
|
|
|
|
_rm_keys = set() |
|
for key in new_keys: |
|
if any(_module in key for _module in _valid_modules): |
|
_rm_keys.add(key) |
|
new_keys -= _rm_keys |
|
|
|
|
|
|
|
for _module in _valid_modules: |
|
if _module + ".weight" in loaded_keys: |
|
new_keys.add(_module + ".weight") |
|
else: |
|
new_keys.update({_module + "." + _ref_key for _ref_key in _ref_keys}) |
|
if _module + ".bias" in loaded_keys: |
|
new_keys.add(_module + ".bias") |
|
|
|
return list(new_keys) |
|
|
|
def check_quantized_param( |
|
self, |
|
model: "PreTrainedModel", |
|
param_value: "torch.Tensor", |
|
param_name: str, |
|
state_dict: Dict[str, Any], |
|
**kwargs, |
|
) -> bool: |
|
if is_hqq_available(): |
|
from hqq.core.quantize import HQQLinear |
|
module, tensor_name = get_module_from_name(model, param_name) |
|
|
|
if self.pre_quantized: |
|
return ( |
|
(isinstance(module, torch.nn.Linear) or isinstance(module, HQQLinear)) |
|
and tensor_name != "weight" |
|
and tensor_name != "bias" |
|
) |
|
else: |
|
return isinstance(module, torch.nn.Linear) and tensor_name == "weight" |
|
|
|
def create_quantized_param( |
|
self, |
|
model: "PreTrainedModel", |
|
param_value: "torch.Tensor", |
|
param_name: str, |
|
target_device: "torch.device", |
|
state_dict: Dict[str, Any], |
|
unexpected_keys: List[str], |
|
): |
|
""" |
|
Each nn.Linear layer is processsed here. |
|
We first check if the corresponding module state_dict contains already HQQ quantized parameters. |
|
If not, we create a temp linear layer with the module state_dict params and use it for quantization |
|
""" |
|
|
|
if is_hqq_available(): |
|
from hqq.core.quantize import HQQLinear |
|
|
|
module, tensor_name = get_module_from_name(model, param_name) |
|
layer_name = ".".join(param_name.split(".")[:-1]) |
|
parent_module = find_parent(model, layer_name) |
|
node = layer_name.split(".")[-1] |
|
|
|
|
|
module_state_dict = {} |
|
for k, v in state_dict.items(): |
|
if layer_name + "." in k: |
|
module_state_dict[k.split(".")[-1]] = v |
|
if unexpected_keys is not None and k in unexpected_keys: |
|
unexpected_keys.remove(k) |
|
|
|
if self.pre_quantized: |
|
if isinstance(module, HQQLinear): |
|
return |
|
else: |
|
hqq_layer = HQQLinear( |
|
linear_layer=None, |
|
quant_config=None, |
|
compute_dtype=self.torch_dtype, |
|
device=target_device, |
|
) |
|
|
|
hqq_layer.load_state_dict(module_state_dict) |
|
|
|
if hqq_layer.bias is not None and isinstance(hqq_layer.bias, torch.Tensor): |
|
hqq_layer.bias = torch.nn.Parameter(hqq_layer.bias) |
|
|
|
if self.using_multi_gpu: |
|
hqq_layer = self._patch_layer_for_multigpu(hqq_layer) |
|
|
|
setattr(parent_module, node, hqq_layer) |
|
|
|
|
|
del module.__dict__, module |
|
torch.cuda.empty_cache() |
|
return |
|
|
|
|
|
for key in module_state_dict: |
|
setattr(module, key, torch.nn.Parameter(module_state_dict[key])) |
|
|
|
|
|
|
|
if hasattr(module, "quant_config"): |
|
hqq_layer = HQQLinear( |
|
module, |
|
module.quant_config, |
|
compute_dtype=self.torch_dtype, |
|
device=target_device, |
|
del_orig=True, |
|
) |
|
|
|
if hqq_layer.bias is not None and isinstance(hqq_layer.bias, torch.Tensor): |
|
hqq_layer.bias = torch.nn.Parameter(hqq_layer.bias) |
|
|
|
if self.using_multi_gpu: |
|
hqq_layer = self._patch_layer_for_multigpu(hqq_layer) |
|
|
|
setattr(parent_module, node, hqq_layer) |
|
|
|
else: |
|
module = module.to(dtype=self.torch_dtype, device=target_device) |
|
setattr(parent_module, node, module) |
|
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
def _patch_layer_for_multigpu(self, hqq_layer): |
|
hqq_layer = remove_hook_from_module(hqq_layer) |
|
|
|
def forward_with_device(self, x): |
|
out = torch.matmul(x.to(self.device), self.dequantize().t()) |
|
if self.bias is not None: |
|
out += self.bias |
|
return out |
|
|
|
hqq_layer.forward = lambda x: forward_with_device(hqq_layer, x) |
|
return hqq_layer |
|
|
|
def _process_model_before_weight_loading( |
|
self, |
|
model: "PreTrainedModel", |
|
device_map, |
|
keep_in_fp32_modules: List[str] = None, |
|
**kwargs, |
|
): |
|
keep_in_fp32_modules = keep_in_fp32_modules if keep_in_fp32_modules is not None else [] |
|
|
|
|
|
|
|
model = prepare_for_hqq_linear(model, quantization_config=self.quantization_config) |
|
|
|
def _process_model_after_weight_loading(self, model: "PreTrainedModel", **kwargs): |
|
model.is_hqq_quantized = True |
|
model.is_hqq_serializable = self.is_serializable() |
|
return model |
|
|
|
def is_serializable(self, safe_serialization=None): |
|
return True |
|
|
|
@property |
|
def is_trainable(self) -> bool: |
|
return True |
|
|