manuel-l01's picture
Initial commit
572abf8
"""
Utilities for operating on encoded Midi sequences.
"""
from collections import defaultdict
from anticipation.config import *
from anticipation.vocab import *
def print_tokens(tokens):
print('---------------------')
for j, (tm, dur, note) in enumerate(zip(tokens[0::3],tokens[1::3],tokens[2::3])):
if note == SEPARATOR:
assert tm == SEPARATOR and dur == SEPARATOR
print(j, 'SEPARATOR')
continue
if note == REST:
assert tm < CONTROL_OFFSET
assert dur == DUR_OFFSET+0
print(j, tm, 'REST')
continue
if note < CONTROL_OFFSET:
tm = tm - TIME_OFFSET
dur = dur - DUR_OFFSET
note = note - NOTE_OFFSET
instr = note//2**7
pitch = note - (2**7)*instr
print(j, tm, dur, instr, pitch)
else:
tm = tm - ATIME_OFFSET
dur = dur - ADUR_OFFSET
note = note - ANOTE_OFFSET
instr = note//2**7
pitch = note - (2**7)*instr
print(j, tm, dur, instr, pitch, '(A)')
def clip(tokens, start, end, clip_duration=True, seconds=True):
if seconds:
start = int(TIME_RESOLUTION*start)
end = int(TIME_RESOLUTION*end)
new_tokens = []
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if note < CONTROL_OFFSET:
this_time = time - TIME_OFFSET
this_dur = dur - DUR_OFFSET
else:
this_time = time - ATIME_OFFSET
this_dur = dur - ADUR_OFFSET
if this_time < start or end < this_time:
continue
# truncate extended notes
if clip_duration and end < this_time + this_dur:
dur -= this_time + this_dur - end
new_tokens.extend([time, dur, note])
return new_tokens
def mask(tokens, start, end):
new_tokens = []
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if note < CONTROL_OFFSET:
this_time = (time - TIME_OFFSET)/float(TIME_RESOLUTION)
else:
this_time = (time - ATIME_OFFSET)/float(TIME_RESOLUTION)
if start < this_time < end:
continue
new_tokens.extend([time, dur, note])
return new_tokens
def delete(tokens, criterion):
new_tokens = []
for token in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if criterion(token):
continue
new_tokens.extend(token)
return new_tokens
def sort(tokens):
""" sort sequence of events or controls (but not both) """
times = tokens[0::3]
indices = sorted(range(len(times)), key=times.__getitem__)
sorted_tokens = []
for idx in indices:
sorted_tokens.extend(tokens[3*idx:3*(idx+1)])
return sorted_tokens
def split(tokens):
""" split a sequence into events and controls """
events = []
controls = []
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if note < CONTROL_OFFSET:
events.extend([time, dur, note])
else:
controls.extend([time, dur, note])
return events, controls
def pad(tokens, end_time=None, density=TIME_RESOLUTION):
end_time = TIME_OFFSET+(end_time if end_time else max_time(tokens, seconds=False))
new_tokens = []
previous_time = TIME_OFFSET+0
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
# must pad before separation, anticipation
assert note < CONTROL_OFFSET
# insert pad tokens to ensure the desired density
while time > previous_time + density:
new_tokens.extend([previous_time+density, DUR_OFFSET+0, REST])
previous_time += density
new_tokens.extend([time, dur, note])
previous_time = time
while end_time > previous_time + density:
new_tokens.extend([previous_time+density, DUR_OFFSET+0, REST])
previous_time += density
return new_tokens
def unpad(tokens):
new_tokens = []
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if note == REST: continue
new_tokens.extend([time, dur, note])
return new_tokens
def anticipate(events, controls, delta=DELTA*TIME_RESOLUTION):
"""
Interleave a sequence of events with anticipated controls.
Inputs:
events : a sequence of events
controls : a sequence of time-localized controls
delta : the anticipation interval
Returns:
tokens : interleaved events and anticipated controls
controls : unconsumed controls (control time > max_time(events) + delta)
"""
if len(controls) == 0:
return events, controls
tokens = []
event_time = 0
control_time = controls[0] - ATIME_OFFSET
for time, dur, note in zip(events[0::3],events[1::3],events[2::3]):
while event_time >= control_time - delta:
tokens.extend(controls[0:3])
controls = controls[3:] # consume this control
control_time = controls[0] - ATIME_OFFSET if len(controls) > 0 else float('inf')
assert note < CONTROL_OFFSET
event_time = time - TIME_OFFSET
tokens.extend([time, dur, note])
return tokens, controls
def sparsity(tokens):
max_dt = 0
previous_time = TIME_OFFSET+0
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if note == SEPARATOR: continue
assert note < CONTROL_OFFSET # don't operate on interleaved sequences
max_dt = max(max_dt, time - previous_time)
previous_time = time
return max_dt
def min_time(tokens, seconds=True, instr=None):
mt = None
for time, dur, note in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
# stop calculating at sequence separator
if note == SEPARATOR: break
if note < CONTROL_OFFSET:
time -= TIME_OFFSET
note -= NOTE_OFFSET
else:
time -= ATIME_OFFSET
note -= ANOTE_OFFSET
# min time of a particular instrument
if instr is not None and instr != note//2**7:
continue
mt = time if mt is None else min(mt, time)
if mt is None: mt = 0
return mt/float(TIME_RESOLUTION) if seconds else mt
def max_time(tokens, seconds=True, instr=None):
mt = 0
for time, dur, note in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
# keep checking for max_time, even if it appears after a separator
# (this is important because we use this check for vocab overflow in tokenization)
if note == SEPARATOR: continue
if note < CONTROL_OFFSET:
time -= TIME_OFFSET
note -= NOTE_OFFSET
else:
time -= ATIME_OFFSET
note -= ANOTE_OFFSET
# max time of a particular instrument
if instr is not None and instr != note//2**7:
continue
mt = max(mt, time)
return mt/float(TIME_RESOLUTION) if seconds else mt
def get_instruments(tokens):
instruments = defaultdict(int)
for time, dur, note in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if note >= SPECIAL_OFFSET: continue
if note < CONTROL_OFFSET:
note -= NOTE_OFFSET
else:
note -= ANOTE_OFFSET
instr = note//2**7
instruments[instr] += 1
return instruments
def translate(tokens, dt, seconds=False):
if seconds:
dt = int(TIME_RESOLUTION*dt)
new_tokens = []
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
# stop translating after EOT
if note == SEPARATOR:
new_tokens.extend([time, dur, note])
dt = 0
continue
if note < CONTROL_OFFSET:
this_time = time - TIME_OFFSET
else:
this_time = time - ATIME_OFFSET
assert 0 <= this_time + dt
new_tokens.extend([time+dt, dur, note])
return new_tokens
def combine(events, controls):
return sort(events + [token - CONTROL_OFFSET for token in controls])