|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import warnings |
|
|
from typing import Dict, List, Tuple, Union, Literal |
|
|
import numpy as np |
|
|
import numpy.typing as npt |
|
|
|
|
|
|
|
|
def prefilter_boxes( |
|
|
boxes: List[npt.NDArray[np.float64]], |
|
|
scores: List[npt.NDArray[np.float64]], |
|
|
labels: List[npt.NDArray[np.int_]], |
|
|
weights: List[float], |
|
|
thr: float, |
|
|
class_agnostic: bool = False, |
|
|
) -> Dict[Union[str, int], npt.NDArray[np.float64]]: |
|
|
""" |
|
|
Reformats and filters boxes. |
|
|
Output is a dict of boxes to merge separately. |
|
|
|
|
|
Args: |
|
|
boxes (list[np array[n x 4]]): List of boxes. One list per model. |
|
|
scores (list[np array[n]]): List of confidences. |
|
|
labels (list[np array[n]]): List of labels. |
|
|
weights (list): Model weights. |
|
|
thr (float): Confidence threshold |
|
|
class_agnostic (bool, optional): Merge boxes from different classes. Defaults to False. |
|
|
|
|
|
Returns: |
|
|
dict[np array [? x 8]]: Filtered boxes. |
|
|
""" |
|
|
|
|
|
new_boxes = dict() |
|
|
|
|
|
for t in range(len(boxes)): |
|
|
assert len(boxes[t]) == len(scores[t]), "len(boxes) != len(scores)" |
|
|
assert len(boxes[t]) == len(labels[t]), "len(boxes) != len(labels)" |
|
|
|
|
|
for j in range(len(boxes[t])): |
|
|
score = scores[t][j] |
|
|
if score < thr: |
|
|
continue |
|
|
label = int(labels[t][j]) |
|
|
box_part = boxes[t][j] |
|
|
x1 = float(box_part[0]) |
|
|
y1 = float(box_part[1]) |
|
|
x2 = float(box_part[2]) |
|
|
y2 = float(box_part[3]) |
|
|
|
|
|
|
|
|
if x2 < x1: |
|
|
warnings.warn("X2 < X1 value in box. Swap them.") |
|
|
x1, x2 = x2, x1 |
|
|
if y2 < y1: |
|
|
warnings.warn("Y2 < Y1 value in box. Swap them.") |
|
|
y1, y2 = y2, y1 |
|
|
|
|
|
array = np.array([x1, x2, y1, y2]) |
|
|
if array.min() < 0 or array.max() > 1: |
|
|
warnings.warn("Coordinates outside [0, 1]") |
|
|
array = np.clip(array, 0, 1) |
|
|
x1, x2, y1, y2 = array |
|
|
|
|
|
if (x2 - x1) * (y2 - y1) == 0.0: |
|
|
warnings.warn("Zero area box skipped: {}.".format(box_part)) |
|
|
continue |
|
|
|
|
|
|
|
|
b = [int(label), float(score) * weights[t], weights[t], t, x1, y1, x2, y2] |
|
|
|
|
|
label_k = "*" if class_agnostic else label |
|
|
if label_k not in new_boxes: |
|
|
new_boxes[label_k] = [] |
|
|
new_boxes[label_k].append(b) |
|
|
|
|
|
|
|
|
for k in new_boxes: |
|
|
current_boxes = np.array(new_boxes[k]) |
|
|
new_boxes[k] = current_boxes[current_boxes[:, 1].argsort()[::-1]] |
|
|
|
|
|
return new_boxes |
|
|
|
|
|
|
|
|
def merge_labels( |
|
|
labels: npt.NDArray[np.int_], confs: npt.NDArray[np.float64] |
|
|
) -> int: |
|
|
""" |
|
|
Custom function for merging labels. |
|
|
If all labels are the same, return the unique value. |
|
|
Else, return the label of the most confident non-title (class 2) box. |
|
|
|
|
|
Args: |
|
|
labels (np array [n]): Labels. |
|
|
confs (np array [n]): Confidence. |
|
|
|
|
|
Returns: |
|
|
int: Label. |
|
|
""" |
|
|
if len(np.unique(labels)) == 1: |
|
|
return labels[0] |
|
|
else: |
|
|
confs = confs[confs != 2] |
|
|
labels = labels[labels != 2] |
|
|
return labels[np.argmax(confs)] |
|
|
|
|
|
|
|
|
def get_weighted_box( |
|
|
boxes: npt.NDArray[np.float64], conf_type: Literal["avg", "max"] = "avg" |
|
|
) -> npt.NDArray[np.float64]: |
|
|
""" |
|
|
Merges boxes by using the weighted fusion. |
|
|
|
|
|
Args: |
|
|
boxes (np array [n x 8]): Boxes to merge. |
|
|
conf_type (str, optional): Confidence merging type. Defaults to "avg". |
|
|
|
|
|
Returns: |
|
|
np array [8]: Merged box. |
|
|
""" |
|
|
box = np.zeros(8, dtype=np.float32) |
|
|
conf = 0 |
|
|
conf_list = [] |
|
|
w = 0 |
|
|
for b in boxes: |
|
|
box[4:] += b[1] * b[4:] |
|
|
conf += b[1] |
|
|
conf_list.append(b[1]) |
|
|
w += b[2] |
|
|
|
|
|
box[0] = merge_labels( |
|
|
np.array([b[0] for b in boxes]), np.array([b[1] for b in boxes]) |
|
|
) |
|
|
|
|
|
box[1] = np.max(conf_list) if conf_type == "max" else np.mean(conf_list) |
|
|
box[2] = w |
|
|
box[3] = -1 |
|
|
box[4:] /= conf |
|
|
return box |
|
|
|
|
|
|
|
|
def get_biggest_box( |
|
|
boxes: npt.NDArray[np.float64], conf_type: Literal["avg", "max"] = "avg" |
|
|
) -> npt.NDArray[np.float64]: |
|
|
""" |
|
|
Merges boxes by using the biggest box. |
|
|
|
|
|
Args: |
|
|
boxes (np array [n x 8]): Boxes to merge. |
|
|
conf_type (str, optional): Confidence merging type. Defaults to "avg". |
|
|
|
|
|
Returns: |
|
|
np array [8]: Merged box. |
|
|
""" |
|
|
box = np.zeros(8, dtype=np.float32) |
|
|
box[4:] = boxes[0][4:] |
|
|
conf_list = [] |
|
|
w = 0 |
|
|
for b in boxes: |
|
|
box[4] = min(box[4], b[4]) |
|
|
box[5] = min(box[5], b[5]) |
|
|
box[6] = max(box[6], b[6]) |
|
|
box[7] = max(box[7], b[7]) |
|
|
conf_list.append(b[1]) |
|
|
w += b[2] |
|
|
|
|
|
box[0] = merge_labels( |
|
|
np.array([b[0] for b in boxes]), np.array([b[1] for b in boxes]) |
|
|
) |
|
|
|
|
|
|
|
|
box[1] = np.max(conf_list) if conf_type == "max" else np.mean(conf_list) |
|
|
box[2] = w |
|
|
box[3] = -1 |
|
|
return box |
|
|
|
|
|
|
|
|
def find_matching_box_fast( |
|
|
boxes_list: npt.NDArray[np.float64], |
|
|
new_box: npt.NDArray[np.float64], |
|
|
match_iou: float, |
|
|
) -> Tuple[int, float]: |
|
|
""" |
|
|
Reimplementation of find_matching_box with numpy instead of loops. |
|
|
Gives significant speed up for larger arrays (~100x). |
|
|
This was previously the bottleneck since the function is called for every entry in the array. |
|
|
|
|
|
Args: |
|
|
boxes_list (np.ndarray): Array of boxes with shape (N, 8). |
|
|
new_box (np.ndarray): New box to match with shape (8,). |
|
|
match_iou (float): IoU threshold for matching. |
|
|
|
|
|
Returns: |
|
|
Tuple[int, float]: Index of best matching box (-1 if no match) and IoU value. |
|
|
""" |
|
|
|
|
|
def bb_iou_array( |
|
|
boxes: npt.NDArray[np.float64], new_box: npt.NDArray[np.float64] |
|
|
) -> npt.NDArray[np.float64]: |
|
|
|
|
|
xA = np.maximum(boxes[:, 0], new_box[0]) |
|
|
yA = np.maximum(boxes[:, 1], new_box[1]) |
|
|
xB = np.minimum(boxes[:, 2], new_box[2]) |
|
|
yB = np.minimum(boxes[:, 3], new_box[3]) |
|
|
|
|
|
interArea = np.maximum(xB - xA, 0) * np.maximum(yB - yA, 0) |
|
|
|
|
|
|
|
|
boxAArea = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) |
|
|
boxBArea = (new_box[2] - new_box[0]) * (new_box[3] - new_box[1]) |
|
|
|
|
|
iou = interArea / (boxAArea + boxBArea - interArea) |
|
|
|
|
|
return iou |
|
|
|
|
|
if boxes_list.shape[0] == 0: |
|
|
return -1, match_iou |
|
|
|
|
|
ious = bb_iou_array(boxes_list[:, 4:], new_box[4:]) |
|
|
|
|
|
|
|
|
best_idx = np.argmax(ious) |
|
|
best_iou = ious[best_idx] |
|
|
|
|
|
if best_iou <= match_iou: |
|
|
best_iou = match_iou |
|
|
best_idx = -1 |
|
|
|
|
|
return best_idx, best_iou |
|
|
|
|
|
|
|
|
def weighted_boxes_fusion( |
|
|
boxes_list: List[npt.NDArray[np.float64]], |
|
|
labels_list: List[npt.NDArray[np.int_]], |
|
|
scores_list: List[npt.NDArray[np.float64]], |
|
|
iou_thr: float = 0.5, |
|
|
skip_box_thr: float = 0.0, |
|
|
conf_type: Literal["avg", "max"] = "avg", |
|
|
merge_type: Literal["weighted", "biggest"] = "weighted", |
|
|
class_agnostic: bool = False, |
|
|
) -> Tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.int_]]: |
|
|
""" |
|
|
Custom WBF implementation that supports a class_agnostic mode and a biggest box fusion. |
|
|
Boxes are expected to be in normalized (x0, y0, x1, y1) format. |
|
|
|
|
|
Args: |
|
|
boxes_list (list[np.ndarray[n x 4]]): List of boxes. One list per model. |
|
|
labels_list (list[np.ndarray[n]]): List of labels. |
|
|
scores_list (list[np.ndarray[n]]): List of confidences. |
|
|
iou_thr (float, optional): IoU threshold for matching. Defaults to 0.55. |
|
|
skip_box_thr (float, optional): Exclude boxes with score < skip_box_thr. Defaults to 0.0. |
|
|
conf_type (str, optional): Confidence merging type ("avg" or "max"). Defaults to "avg". |
|
|
merge_type (str, optional): Merge type ("weighted" or "biggest"). Defaults to "weighted". |
|
|
class_agnostic (bool, optional): Merge boxes from different classes. Defaults to False. |
|
|
|
|
|
Returns: |
|
|
numpy.ndarray [N x 4]: Array of bounding boxes. |
|
|
numpy.ndarray [N]: Array of labels. |
|
|
numpy.ndarray [N]: Array of scores. |
|
|
""" |
|
|
weights = np.ones(len(boxes_list)) |
|
|
|
|
|
assert conf_type in ["avg", "max"], 'Conf type must be "avg" or "max"' |
|
|
assert merge_type in ["weighted", "biggest"], 'Conf type must be "weighted" or "biggest"' |
|
|
|
|
|
filtered_boxes = prefilter_boxes( |
|
|
boxes_list, |
|
|
scores_list, |
|
|
labels_list, |
|
|
weights, |
|
|
skip_box_thr, |
|
|
class_agnostic=class_agnostic, |
|
|
) |
|
|
if len(filtered_boxes) == 0: |
|
|
return np.zeros((0, 4)), np.zeros((0,)), np.zeros((0,)) |
|
|
|
|
|
overall_boxes = [] |
|
|
for label in filtered_boxes: |
|
|
boxes = filtered_boxes[label] |
|
|
clusters = [] |
|
|
|
|
|
|
|
|
for j in range(len(boxes)): |
|
|
ids = [i for i in range(len(boxes)) if i != j] |
|
|
index, best_iou = find_matching_box_fast(boxes[ids], boxes[j], iou_thr) |
|
|
|
|
|
if index != -1: |
|
|
index = ids[index] |
|
|
cluster_idx = [ |
|
|
clust_idx |
|
|
for clust_idx, clust in enumerate(clusters) |
|
|
if (j in clust or index in clust) |
|
|
] |
|
|
if len(cluster_idx): |
|
|
cluster_idx = cluster_idx[0] |
|
|
clusters[cluster_idx] = list( |
|
|
set(clusters[cluster_idx] + [index, j]) |
|
|
) |
|
|
else: |
|
|
clusters.append([index, j]) |
|
|
else: |
|
|
clusters.append([j]) |
|
|
|
|
|
for j, c in enumerate(clusters): |
|
|
if merge_type == "weighted": |
|
|
weighted_box = get_weighted_box(boxes[c], conf_type) |
|
|
elif merge_type == "biggest": |
|
|
weighted_box = get_biggest_box(boxes[c], conf_type) |
|
|
|
|
|
if conf_type == "max": |
|
|
weighted_box[1] = weighted_box[1] / weights.max() |
|
|
else: |
|
|
weighted_box[1] = weighted_box[1] * len(c) / weights.sum() |
|
|
overall_boxes.append(weighted_box) |
|
|
|
|
|
overall_boxes = np.array(overall_boxes) |
|
|
overall_boxes = overall_boxes[overall_boxes[:, 1].argsort()[::-1]] |
|
|
boxes = overall_boxes[:, 4:] |
|
|
scores = overall_boxes[:, 1] |
|
|
labels = overall_boxes[:, 0] |
|
|
return boxes, labels, scores |
|
|
|