from absl import app from absl import flags from colorsys import hls_to_rgb import json import numpy as np import os import xml.etree.ElementTree as ET from fish_length import Fish_Length from tracker import Tracker flags.DEFINE_string( 'json_dump_path', None, 'Path to json containing annotated clip info.' ) flags.DEFINE_string( 'xml_dir', None, 'Directory containing xml annotation files.' ) flags.DEFINE_string( 'output_path', None, 'Directory to output clip annotation jsons.' ) flags.mark_flag_as_required('json_dump_path') flags.mark_flag_as_required('xml_dir') flags.mark_flag_as_required('output_path') FLAGS = flags.FLAGS def get_annotation_from_clip(clip): root = ET.parse(os.path.join(FLAGS.xml_dir, clip['clip_name']+'.xml')).getroot() data = {} data['clip_id'] = clip['clip_id'] data['aris_filename'] = clip['aris_filename'] data['start_frame'] = clip['start_frame'] data['end_frame'] = clip['end_frame'] data['upstream_direction'] = clip['upstream_direction'] data['image_meter_width'] = clip['aris_info']['pixel_meter_size']*clip['aris_info']['xdim'] # Create image entries frames = [] for i in range(int(root.findall('object')[0].findtext('startFrame')), 1 + int(root.findall('object')[0].findtext('endFrame'))): frame = { 'frame_num': clip['start_frame'] + i, 'fish': [] } frames.append(frame) fishes = [] # Populate images with bboxes for track_id, object in enumerate(root.findall('object')): fish = {} fish['id'] = track_id fish['length'] = -1 fish['direction'] = 'N/A' fish['start_frame_index'] = -1 fish['end_frame_index'] = -1 fish['color'] = Tracker.selectColor(track_id) fishes.append(fish) last_drawn = None stat_interp = [] lengths = [] for polygon in object.findall('polygon'): if int(polygon.findtext('pt/l')) == -1: continue index = int(polygon.find('t').text) frame = frames[index] frame_entry = {} frame_entry['fish_id'] = track_id frame_entry['bbox'] = None frame_entry['visible'] = 1 frame_entry['human_labeled'] = int(polygon.findtext('pt/l')) frame['fish'].append(frame_entry) # Determine if polygon is stationary if polygon.findtext('s') is not None and int(polygon.findtext('s')): stat_interp.append(frame_entry) else: frame_entry['bbox'] = list(np.array([int(polygon.findtext('pt/x'))/clip['aris_info']['xdim'], int(polygon.findtext('pt/y'))/clip['aris_info']['ydim'], int(polygon.findall('pt/x')[2].text)/clip['aris_info']['xdim'], int(polygon.findall('pt/y')[1].text)/clip['aris_info']['ydim']])) # Coordinates of greater than 1.1 will cause training to fail if (np.array(frame_entry['bbox']) > 1.1).any(): print('Error: Invalid bbox.') frame['fish'].pop() continue lengths.append(int(polygon.findall('pt/x')[2].text)-int(polygon.findtext('pt/x'))) # Interpolate if there are stationary boxes if stat_interp: bbox_interp = last_drawn + np.dot(1 + np.array(range(len(stat_interp)))[:,np.newaxis], (np.array(frame_entry['bbox']) - np.array(last_drawn))[np.newaxis])/(len(stat_interp) + 1) for frame_entry, bbox in zip(stat_interp, bbox_interp): frame_entry['bbox'] = list(bbox) stat_interp = [] last_drawn = frame_entry['bbox'] if fish['start_frame_index'] == -1: fish['start_frame_index'] = index fish['end_frame_index'] = index if stat_interp: for frame_entry in stat_interp: frame_entry['bbox'] = last_drawn data['frames'] = frames data['fish'] = fishes # Add track for fish in fishes: for frame_entry in frames[fish['start_frame_index']]['fish']: if frame_entry['fish_id'] == fish['id']: start_bbox = frame_entry['bbox'] break else: raise RuntimeWarning(f'Start box of fish {fish["id"]} in {clip["clip_name"]} is not defined.') for frame_entry in frames[fish['end_frame_index']]['fish']: if frame_entry['fish_id'] == fish['id']: end_bbox = frame_entry['bbox'] break else: raise RuntimeWarning(f'End box of fish {fish["id"]} in {clip["clip_name"]} is not defined.') fish['direction'] = Tracker.get_direction(start_bbox, end_bbox) return Fish_Length.add_lengths(data) def main(argv): with open(FLAGS.json_dump_path) as json_file: json_dump = json.load(json_file) for clip in json_dump: data = get_annotation_from_clip(clip) with open(os.path.join(FLAGS.output_path, f'{clip["clip_name"]}.json'), 'w') as output_file: json.dump(data, output_file, indent=2) if __name__ == '__main__': app.run(main)