""" Utilities for operating on encoded Midi sequences. """ from collections import defaultdict from anticipation.config import * from anticipation.vocab import * def print_tokens(tokens): print('---------------------') for j, (tm, dur, note) in enumerate(zip(tokens[0::3],tokens[1::3],tokens[2::3])): if note == SEPARATOR: assert tm == SEPARATOR and dur == SEPARATOR print(j, 'SEPARATOR') continue if note == REST: assert tm < CONTROL_OFFSET assert dur == DUR_OFFSET+0 print(j, tm, 'REST') continue if note < CONTROL_OFFSET: tm = tm - TIME_OFFSET dur = dur - DUR_OFFSET note = note - NOTE_OFFSET instr = note//2**7 pitch = note - (2**7)*instr print(j, tm, dur, instr, pitch) else: tm = tm - ATIME_OFFSET dur = dur - ADUR_OFFSET note = note - ANOTE_OFFSET instr = note//2**7 pitch = note - (2**7)*instr print(j, tm, dur, instr, pitch, '(A)') def clip(tokens, start, end, clip_duration=True, seconds=True): if seconds: start = int(TIME_RESOLUTION*start) end = int(TIME_RESOLUTION*end) new_tokens = [] for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if note < CONTROL_OFFSET: this_time = time - TIME_OFFSET this_dur = dur - DUR_OFFSET else: this_time = time - ATIME_OFFSET this_dur = dur - ADUR_OFFSET if this_time < start or end < this_time: continue # truncate extended notes if clip_duration and end < this_time + this_dur: dur -= this_time + this_dur - end new_tokens.extend([time, dur, note]) return new_tokens def mask(tokens, start, end): new_tokens = [] for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if note < CONTROL_OFFSET: this_time = (time - TIME_OFFSET)/float(TIME_RESOLUTION) else: this_time = (time - ATIME_OFFSET)/float(TIME_RESOLUTION) if start < this_time < end: continue new_tokens.extend([time, dur, note]) return new_tokens def delete(tokens, criterion): new_tokens = [] for token in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if criterion(token): continue new_tokens.extend(token) return new_tokens def sort(tokens): """ sort sequence of events or controls (but not both) """ times = tokens[0::3] indices = sorted(range(len(times)), key=times.__getitem__) sorted_tokens = [] for idx in indices: sorted_tokens.extend(tokens[3*idx:3*(idx+1)]) return sorted_tokens def split(tokens): """ split a sequence into events and controls """ events = [] controls = [] for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if note < CONTROL_OFFSET: events.extend([time, dur, note]) else: controls.extend([time, dur, note]) return events, controls def pad(tokens, end_time=None, density=TIME_RESOLUTION): end_time = TIME_OFFSET+(end_time if end_time else max_time(tokens, seconds=False)) new_tokens = [] previous_time = TIME_OFFSET+0 for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): # must pad before separation, anticipation assert note < CONTROL_OFFSET # insert pad tokens to ensure the desired density while time > previous_time + density: new_tokens.extend([previous_time+density, DUR_OFFSET+0, REST]) previous_time += density new_tokens.extend([time, dur, note]) previous_time = time while end_time > previous_time + density: new_tokens.extend([previous_time+density, DUR_OFFSET+0, REST]) previous_time += density return new_tokens def unpad(tokens): new_tokens = [] for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if note == REST: continue new_tokens.extend([time, dur, note]) return new_tokens def anticipate(events, controls, delta=DELTA*TIME_RESOLUTION): """ Interleave a sequence of events with anticipated controls. Inputs: events : a sequence of events controls : a sequence of time-localized controls delta : the anticipation interval Returns: tokens : interleaved events and anticipated controls controls : unconsumed controls (control time > max_time(events) + delta) """ if len(controls) == 0: return events, controls tokens = [] event_time = 0 control_time = controls[0] - ATIME_OFFSET for time, dur, note in zip(events[0::3],events[1::3],events[2::3]): while event_time >= control_time - delta: tokens.extend(controls[0:3]) controls = controls[3:] # consume this control control_time = controls[0] - ATIME_OFFSET if len(controls) > 0 else float('inf') assert note < CONTROL_OFFSET event_time = time - TIME_OFFSET tokens.extend([time, dur, note]) return tokens, controls def sparsity(tokens): max_dt = 0 previous_time = TIME_OFFSET+0 for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if note == SEPARATOR: continue assert note < CONTROL_OFFSET # don't operate on interleaved sequences max_dt = max(max_dt, time - previous_time) previous_time = time return max_dt def min_time(tokens, seconds=True, instr=None): mt = None for time, dur, note in zip(tokens[0::3],tokens[1::3],tokens[2::3]): # stop calculating at sequence separator if note == SEPARATOR: break if note < CONTROL_OFFSET: time -= TIME_OFFSET note -= NOTE_OFFSET else: time -= ATIME_OFFSET note -= ANOTE_OFFSET # min time of a particular instrument if instr is not None and instr != note//2**7: continue mt = time if mt is None else min(mt, time) if mt is None: mt = 0 return mt/float(TIME_RESOLUTION) if seconds else mt def max_time(tokens, seconds=True, instr=None): mt = 0 for time, dur, note in zip(tokens[0::3],tokens[1::3],tokens[2::3]): # keep checking for max_time, even if it appears after a separator # (this is important because we use this check for vocab overflow in tokenization) if note == SEPARATOR: continue if note < CONTROL_OFFSET: time -= TIME_OFFSET note -= NOTE_OFFSET else: time -= ATIME_OFFSET note -= ANOTE_OFFSET # max time of a particular instrument if instr is not None and instr != note//2**7: continue mt = max(mt, time) return mt/float(TIME_RESOLUTION) if seconds else mt def get_instruments(tokens): instruments = defaultdict(int) for time, dur, note in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if note >= SPECIAL_OFFSET: continue if note < CONTROL_OFFSET: note -= NOTE_OFFSET else: note -= ANOTE_OFFSET instr = note//2**7 instruments[instr] += 1 return instruments def translate(tokens, dt, seconds=False): if seconds: dt = int(TIME_RESOLUTION*dt) new_tokens = [] for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): # stop translating after EOT if note == SEPARATOR: new_tokens.extend([time, dur, note]) dt = 0 continue if note < CONTROL_OFFSET: this_time = time - TIME_OFFSET else: this_time = time - ATIME_OFFSET assert 0 <= this_time + dt new_tokens.extend([time+dt, dur, note]) return new_tokens def combine(events, controls): return sort(events + [token - CONTROL_OFFSET for token in controls])