BoLiu's picture
update SPDX
5facae9
raw
history blame
11 kB
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Adapted from:
# https://github.com/ZFTurbo/Weighted-Boxes-Fusion/blob/master/ensemble_boxes/ensemble_boxes_wbf.py
import warnings
from typing import Dict, List, Tuple, Union, Literal
import numpy as np
import numpy.typing as npt
def prefilter_boxes(
boxes: List[npt.NDArray[np.float64]],
scores: List[npt.NDArray[np.float64]],
labels: List[npt.NDArray[np.int_]],
weights: List[float],
thr: float,
class_agnostic: bool = False,
) -> Dict[Union[str, int], npt.NDArray[np.float64]]:
"""
Reformats and filters boxes.
Output is a dict of boxes to merge separately.
Args:
boxes (list[np array[n x 4]]): List of boxes. One list per model.
scores (list[np array[n]]): List of confidences.
labels (list[np array[n]]): List of labels.
weights (list): Model weights.
thr (float): Confidence threshold
class_agnostic (bool, optional): Merge boxes from different classes. Defaults to False.
Returns:
dict[np array [? x 8]]: Filtered boxes.
"""
# Create dict with boxes stored by its label
new_boxes = dict()
for t in range(len(boxes)):
assert len(boxes[t]) == len(scores[t]), "len(boxes) != len(scores)"
assert len(boxes[t]) == len(labels[t]), "len(boxes) != len(labels)"
for j in range(len(boxes[t])):
score = scores[t][j]
if score < thr:
continue
label = int(labels[t][j])
box_part = boxes[t][j]
x1 = float(box_part[0])
y1 = float(box_part[1])
x2 = float(box_part[2])
y2 = float(box_part[3])
# Box data checks
if x2 < x1:
warnings.warn("X2 < X1 value in box. Swap them.")
x1, x2 = x2, x1
if y2 < y1:
warnings.warn("Y2 < Y1 value in box. Swap them.")
y1, y2 = y2, y1
array = np.array([x1, x2, y1, y2])
if array.min() < 0 or array.max() > 1:
warnings.warn("Coordinates outside [0, 1]")
array = np.clip(array, 0, 1)
x1, x2, y1, y2 = array
if (x2 - x1) * (y2 - y1) == 0.0:
warnings.warn("Zero area box skipped: {}.".format(box_part))
continue
# [label, score, weight, model index, x1, y1, x2, y2]
b = [int(label), float(score) * weights[t], weights[t], t, x1, y1, x2, y2]
label_k = "*" if class_agnostic else label
if label_k not in new_boxes:
new_boxes[label_k] = []
new_boxes[label_k].append(b)
# Sort each list in dict by score and transform it to numpy array
for k in new_boxes:
current_boxes = np.array(new_boxes[k])
new_boxes[k] = current_boxes[current_boxes[:, 1].argsort()[::-1]]
return new_boxes
def merge_labels(
labels: npt.NDArray[np.int_], confs: npt.NDArray[np.float64]
) -> int:
"""
Custom function for merging labels.
If all labels are the same, return the unique value.
Else, return the label of the most confident non-title (class 2) box.
Args:
labels (np array [n]): Labels.
confs (np array [n]): Confidence.
Returns:
int: Label.
"""
if len(np.unique(labels)) == 1:
return labels[0]
else: # Most confident and not a title
confs = confs[confs != 2]
labels = labels[labels != 2]
return labels[np.argmax(confs)]
def get_weighted_box(
boxes: npt.NDArray[np.float64], conf_type: Literal["avg", "max"] = "avg"
) -> npt.NDArray[np.float64]:
"""
Merges boxes by using the weighted fusion.
Args:
boxes (np array [n x 8]): Boxes to merge.
conf_type (str, optional): Confidence merging type. Defaults to "avg".
Returns:
np array [8]: Merged box.
"""
box = np.zeros(8, dtype=np.float32)
conf = 0
conf_list = []
w = 0
for b in boxes:
box[4:] += b[1] * b[4:]
conf += b[1]
conf_list.append(b[1])
w += b[2]
box[0] = merge_labels(
np.array([b[0] for b in boxes]), np.array([b[1] for b in boxes])
)
box[1] = np.max(conf_list) if conf_type == "max" else np.mean(conf_list)
box[2] = w
box[3] = -1 # model index field is retained for consistency but is not used.
box[4:] /= conf
return box
def get_biggest_box(
boxes: npt.NDArray[np.float64], conf_type: Literal["avg", "max"] = "avg"
) -> npt.NDArray[np.float64]:
"""
Merges boxes by using the biggest box.
Args:
boxes (np array [n x 8]): Boxes to merge.
conf_type (str, optional): Confidence merging type. Defaults to "avg".
Returns:
np array [8]: Merged box.
"""
box = np.zeros(8, dtype=np.float32)
box[4:] = boxes[0][4:]
conf_list = []
w = 0
for b in boxes:
box[4] = min(box[4], b[4])
box[5] = min(box[5], b[5])
box[6] = max(box[6], b[6])
box[7] = max(box[7], b[7])
conf_list.append(b[1])
w += b[2]
box[0] = merge_labels(
np.array([b[0] for b in boxes]), np.array([b[1] for b in boxes])
)
# print(box[0], np.array([b[0] for b in boxes]))
box[1] = np.max(conf_list) if conf_type == "max" else np.mean(conf_list)
box[2] = w
box[3] = -1 # model index field is retained for consistency but is not used.
return box
def find_matching_box_fast(
boxes_list: npt.NDArray[np.float64],
new_box: npt.NDArray[np.float64],
match_iou: float,
) -> Tuple[int, float]:
"""
Reimplementation of find_matching_box with numpy instead of loops.
Gives significant speed up for larger arrays (~100x).
This was previously the bottleneck since the function is called for every entry in the array.
Args:
boxes_list (np.ndarray): Array of boxes with shape (N, 8).
new_box (np.ndarray): New box to match with shape (8,).
match_iou (float): IoU threshold for matching.
Returns:
Tuple[int, float]: Index of best matching box (-1 if no match) and IoU value.
"""
def bb_iou_array(
boxes: npt.NDArray[np.float64], new_box: npt.NDArray[np.float64]
) -> npt.NDArray[np.float64]:
# bb interesection over union
xA = np.maximum(boxes[:, 0], new_box[0])
yA = np.maximum(boxes[:, 1], new_box[1])
xB = np.minimum(boxes[:, 2], new_box[2])
yB = np.minimum(boxes[:, 3], new_box[3])
interArea = np.maximum(xB - xA, 0) * np.maximum(yB - yA, 0)
# compute the area of both the prediction and ground-truth rectangles
boxAArea = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
boxBArea = (new_box[2] - new_box[0]) * (new_box[3] - new_box[1])
iou = interArea / (boxAArea + boxBArea - interArea)
return iou
if boxes_list.shape[0] == 0:
return -1, match_iou
ious = bb_iou_array(boxes_list[:, 4:], new_box[4:])
# ious[boxes[:, 0] != new_box[0]] = -1
best_idx = np.argmax(ious)
best_iou = ious[best_idx]
if best_iou <= match_iou:
best_iou = match_iou
best_idx = -1
return best_idx, best_iou
def weighted_boxes_fusion(
boxes_list: List[npt.NDArray[np.float64]],
labels_list: List[npt.NDArray[np.int_]],
scores_list: List[npt.NDArray[np.float64]],
iou_thr: float = 0.5,
skip_box_thr: float = 0.0,
conf_type: Literal["avg", "max"] = "avg",
merge_type: Literal["weighted", "biggest"] = "weighted",
class_agnostic: bool = False,
) -> Tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.int_]]:
"""
Custom WBF implementation that supports a class_agnostic mode and a biggest box fusion.
Boxes are expected to be in normalized (x0, y0, x1, y1) format.
Args:
boxes_list (list[np.ndarray[n x 4]]): List of boxes. One list per model.
labels_list (list[np.ndarray[n]]): List of labels.
scores_list (list[np.ndarray[n]]): List of confidences.
iou_thr (float, optional): IoU threshold for matching. Defaults to 0.55.
skip_box_thr (float, optional): Exclude boxes with score < skip_box_thr. Defaults to 0.0.
conf_type (str, optional): Confidence merging type ("avg" or "max"). Defaults to "avg".
merge_type (str, optional): Merge type ("weighted" or "biggest"). Defaults to "weighted".
class_agnostic (bool, optional): Merge boxes from different classes. Defaults to False.
Returns:
numpy.ndarray [N x 4]: Array of bounding boxes.
numpy.ndarray [N]: Array of labels.
numpy.ndarray [N]: Array of scores.
"""
weights = np.ones(len(boxes_list))
assert conf_type in ["avg", "max"], 'Conf type must be "avg" or "max"'
assert merge_type in ["weighted", "biggest"], 'Conf type must be "weighted" or "biggest"'
filtered_boxes = prefilter_boxes(
boxes_list,
scores_list,
labels_list,
weights,
skip_box_thr,
class_agnostic=class_agnostic,
)
if len(filtered_boxes) == 0:
return np.zeros((0, 4)), np.zeros((0,)), np.zeros((0,))
overall_boxes = []
for label in filtered_boxes:
boxes = filtered_boxes[label]
clusters = []
# Clusterize boxes
for j in range(len(boxes)):
ids = [i for i in range(len(boxes)) if i != j]
index, best_iou = find_matching_box_fast(boxes[ids], boxes[j], iou_thr)
if index != -1:
index = ids[index]
cluster_idx = [
clust_idx
for clust_idx, clust in enumerate(clusters)
if (j in clust or index in clust)
]
if len(cluster_idx):
cluster_idx = cluster_idx[0]
clusters[cluster_idx] = list(
set(clusters[cluster_idx] + [index, j])
)
else:
clusters.append([index, j])
else:
clusters.append([j])
for j, c in enumerate(clusters):
if merge_type == "weighted":
weighted_box = get_weighted_box(boxes[c], conf_type)
elif merge_type == "biggest":
weighted_box = get_biggest_box(boxes[c], conf_type)
if conf_type == "max":
weighted_box[1] = weighted_box[1] / weights.max()
else: # avg
weighted_box[1] = weighted_box[1] * len(c) / weights.sum()
overall_boxes.append(weighted_box)
overall_boxes = np.array(overall_boxes)
overall_boxes = overall_boxes[overall_boxes[:, 1].argsort()[::-1]]
boxes = overall_boxes[:, 4:]
scores = overall_boxes[:, 1]
labels = overall_boxes[:, 0]
return boxes, labels, scores