Source code for asmd.dataset_utils

from copy import deepcopy

import numpy as np
from sklearn.utils import check_random_state

from . import utils


[docs]def choice(dataset, p=[0.6, 0.2, 0.2], random_state=None): """ Returns N non-overlapping datasets randomply sampled from `dataset`, where N is `len(p)`; each song belong to a dataset according to the distribution probability `p`. Note that `p` is always normalized to sum to 1. `random_state` is an int or a np.random.RandomState object. """ # normalize p p = np.asarray(p, dtype=np.float32) p /= p.sum() # generating non-overlapping splits random_state = check_random_state(random_state) splits = random_state.choice(np.arange(len(p)), p=p, size=(len(dataset), )) # creating output datasets out = [] for i in range(len(p)): d = deepcopy(dataset) d.paths = np.asarray(dataset.paths, dtype=object)[splits == i].tolist() # excluding/including songs for j, song in enumerate(d.get_songs()): if splits[j] == i: song['included'] = True else: song['included'] = False out.append(d) return tuple(out)
[docs]def chose_score_type(score_type, gts): """ Return the proper score type according to the following rules Parameters --- score_type : list of str The key to retrieve the list of notes from the ground_truths. If multiple keys are provided, only one is retrieved by using the following criteria: if there is `precise_alignment` in the list of keys and in the ground truth, use that; otherwise, if there is `broad_alignment` in the list of keys and in the ground truth, use that; otherwise if `misaligned` in the list of keys and in the ground truth, use use `score`. gts : list of dict The list of ground truths from which you want to chose a score_type """ if len(score_type) > 1: if 'precise_alignment' in score_type and len( gts[0]['precise_alignment']['pitches']) > 0: score_type = 'precise_alignment' elif 'broad_alignment' in score_type and len( gts[0]['broad_alignment']['pitches']) > 0: score_type = 'broad_alignment' elif 'misaligned' in score_type and len( gts[0]['misaligned']['pitches']) > 0: score_type = 'misaligned' else: score_type = 'score' else: score_type = score_type[0] return score_type
[docs]def filter(dataset, instruments=[], ensemble=None, mixed=True, sources=False, all=False, composer='', datasets=[], groups=[], ground_truth=[], copy=False): """ Filter the paths of the songs which accomplish the filter described in `kwargs`. If this dataset was already fltered, only filters those paths that are already included. For advanced usage: So that a dataset can be filtered, it must have the following keys: * songs * name * included All the attributes are checked at the song level, except for: * `ensemble`: this is checked at the dataset-level (i.e. each dataset can be for ensemble or not) This may change in future releases * `ground_truth`: this is checked at group level (i.e. each subgroup can have different annotations) Similarly, each song must have the key ``included`` and optionally the other keys that you want to filter, as described by the arguments of this function. Arguments --------- instruments : list of str a list of strings representing the instruments that you want to select (exact match with song) ensemble : bool if loading songs which are composed for an ensemble of instrument. If None, ensemble field will not be checked and will select both (default None) mixed : bool if returning the mixed track for ensemble song (default True ) sources : bool if returning the source track for ensemble recording which provide it (default False ) all : bool only valid if sources is True : if True , all sources (audio and ground-truth) are returned, if False, only the first target instrument is returned. Default False. composer : string the surname of the composer to filter groups : list of strings a list of strings containing the name of the groups that you want to retrieve with a logic 'AND' among them. If empty, all groups are used. Example of groups are: 'train', 'validation', 'test'. The available groups depend on the dataset. Only Maestro dataset supported for now. datasets : list of strings a list of strings containing the name of the datasets to be used. If empty, all datasets are used. See :doc:`License` for the list of default datasets. The matching is case insensitive. ground_truth : dict[str, int] a dictionary (string, int) representing the type of ground-truths needed (logical AND among list elements). Each entry has the form `needed_ground_truth_type` as key and `level_of_truth` as value, where `needed_ground_truth_type` is the key of the ground_truth dictionary and `level_of_truth` is an int ranging from 0 to 2 (0->False, 1->True (manual annotation), 2->True(automatic annotation)). If only part of a dataset contains a certain ground-truth type, you should use the `group` attribute to only select those songs. copy : bool If True, a new Dataset object is returned, and the calling one is leaved untouched Returns ------- The input dataset as modified: `d = Dataset().filter(...)` If ``copy`` is True, return a new Dataset object. """ if copy: ret = deepcopy(dataset) else: ret = dataset # let's remove everything and put only the wanted ones ret.paths = [] datasets = [d.lower() for d in datasets] end = 0 for mydataset in ret.datasets: FLAG = True if not mydataset['included']: FLAG = False if len(datasets) > 0: if mydataset['name'].lower() in datasets: FLAG = True else: FLAG = False # checking dataset-level filters if ensemble is not None: if ensemble != mydataset['ensemble']: FLAG = False # adding groups if ground_truth is checked groups_gt = set() for gt, val in ground_truth: for group, group_gt in mydataset['ground_truth']: if group_gt[gt] == val: groups_gt.add(group) if FLAG: ret._chunks[mydataset['name']] = [end, end] for song in mydataset['songs']: FLAG = True if not song['included']: FLAG = False # checking song levels filters if instruments: if instruments != song['instruments']: FLAG = False if composer: if composer not in song['composer']: FLAG = False if groups: for group in groups: if group not in song['groups']: FLAG = False break # checking groups taken for group-level filtering if groups_gt: if len(groups_gt.intersection(song['groups'])) == 0: FLAG = False if FLAG: gts = song['ground_truth'] source = [] mix = [] if sources and "sources" in song.keys(): if all: source = song['sources']['path'] else: # find the index of the instrument instrument = instruments[0] idx = song['instruments'].index(instrument) # take index of the target instrument source = song['sources']['path'][idx] gts = song['ground_truth'][idx] if mixed: mix = song['recording']['path'] ret.paths.append([mix, source, gts]) end += 1 else: song['included'] = False ret._chunks[mydataset['name']][1] = end else: # exclude dataset and all its songs mydataset['included'] = False for song in mydataset['songs']: song['included'] = False _check_consistency(ret, fix=True) return ret
[docs]def _check_consistency(dataset, fix=False): """ Checks that is a dataset is included, then at least one of its songs is included and that if a dataset is excluded, then all of its songs are excluded. If `fix` is True, if fixes the dataset inclusion, otherwise raise a `RuntimeError` """ for d in dataset.datasets: included_songs = [s['included'] for s in d['songs']] if d['included']: if not any(included_songs): if fix: d['included'] = False else: raise RuntimeError( f"{d['name']} is included but no song is included") else: if any(included_songs): if fix: d['included'] = True else: raise RuntimeError( f"{d['name']} is excluded but at least one song is included" )
[docs]def get_score_mat(dataset, idx, score_type=['misaligned'], return_notes=''): """ Get the score of a certain score, with times of `score_type` Arguments --------- idx : int The index of the song to retrieve. score_type : list of str The key to retrieve the list of notes from the ground_truths. see `chose_score_type` for explanation return_notes : str ``'missing'``, ``'extra'`` or ``'both'``; the notes that will be returned together with the score; see ``asmd.asmd.Dataset.get_missing_extra_notes`` for more info Returns ------- numpy.ndarray : A (n x 6) array where columns represent pitches, onsets (seconds), offsets (seconds), velocities, MIDI program instrument and number of the instrument. Ordered by onsets. If some information is not available, value -255 is used. The array is sorted by onset, pitch and offset (in this order) numpy.ndarray : A boolean array with True if the note is missing or extra (depending on ``return_notes``); only if ``return_notes is not None`` numpy.ndarray : Another boolean array with True if the note is missing or extra (depending on ``return_notes``); only if ``return_notes == 'both'`` """ gts = dataset.get_gts(idx) score_type = chose_score_type(score_type, gts) # print(" Loading ground truth " + score_type) mat = [] for i, gt in enumerate(gts): # initilize each column pitches = np.array(gt[score_type]['pitches']) ons = np.array(gt[score_type]['onsets']) if not len(ons): ons = np.full_like(pitches, -255) missing = len(pitches) - len(ons) if missing < 0: # add -255 to pitches pitches = np.append(pitches, [-255] * -missing) elif missing > 0: # add -255 to ons ons = np.append(ons, [-255] * missing) offs = np.append(gt[score_type]['offsets'], [-255] * missing) if not len(offs): offs = np.full_like(ons, -255) vel = np.append(gt[score_type]['velocities'], [-255] * missing) if not len(vel): vel = np.full_like(ons, -255) missing = len(pitches) - len(vel) if missing < 0: # add -255 to pitches, ons and offs pitches = np.append(pitches, [-255] * -missing) ons = np.append(ons, [-255] * -missing) offs = np.append(offs, [-255] * -missing) elif missing > 0: # add -255 to vel vel = np.append(vel, [-255] * missing) num = np.full_like(ons, i) instr = np.full_like(ons, gt['instrument']) gt_mat = np.array([pitches, ons, offs, vel, instr, num]) mat.append(gt_mat) if len(mat) > 1: # mat now contains one list per each ground-truth, concatenating mat = np.concatenate(mat, axis=1) else: mat = np.array(mat[0]) # transposing: one row per note mat = mat.T # ordering by onset, pitch and offset (in this order) ind = np.lexsort([mat[:, 2], mat[:, 0], mat[:, 1]]) if return_notes: if return_notes == 'both': query = ['missing', 'extra'] else: query = [return_notes] # computing missing/extra notes returned_notes = [] for q in query: missing_extra = dataset.get_missing_extra_notes(idx, q) missing_extra = np.concatenate(missing_extra) returned_notes.append(missing_extra[ind]) return tuple([mat[ind]] + returned_notes) return mat[ind]
[docs]def intersect(*datasets, **kwargs): """ Takes datasets and returns a new dataset representing the intersection of them The datasets must have the same order in the `datasets` and `songs` (e.g. two datasets initialized in the same way and only filtered) This functions calls `filter` to populate the paths and returns them woth all the sources. However, you can pass any argument to `filter`, e.g. the `sources` argument """ assert len(datasets) > 0, "Cannot intersect no datasets" if len(datasets) == 1: return deepcopy(datasets[0]) out = datasets[0] for i in range(1, len(datasets)): out = _compare_dataset(_and_func, out, datasets[i], **kwargs) return out
[docs]def union(*datasets, **kwargs): """ Takes datasets and returns a new dataset representing the union of them The datasets must have the same order in the `datasets` and `songs` (e.g. two datasets initialized in the same way and only filtered) This functions calls `filter` to populate the paths and returns them woth all the sources. However, you can pass any argument to `filter`, e.g. the `sources` argument """ assert len(datasets) > 0, "Cannot unify no datasets" if len(datasets) == 1: return deepcopy(datasets[0]) out = datasets[0] for i in range(1, len(datasets)): out = _compare_dataset(_or_func, out, datasets[i], **kwargs) return out
[docs]def _compare_dataset(compare_func, dataset1, dataset2, **kwargs): """ Returns a new dataset where each song and dataset are included only if `compare_func` is True for each corresponding couplke of songs and datasets """ out = deepcopy(dataset1) out.paths = [] for i, d1 in enumerate(dataset1.datasets): d2 = dataset2.datasets[i] if compare_func(d1['included'], d2['included']): out.datasets[i]['included'] = True for j, s1 in enumerate(d1['songs']): s2 = d2['songs'][j] if compare_func(s1['included'], s2['included']): out.datasets[i]['songs'][j]['included'] = True else: out.datasets[i]['songs'][j]['included'] = False else: out.datasets[i]['included'] = False # populate paths return filter(out, **kwargs)
def _or_func(a, b): return a or b def _and_func(a, b): return a and b
[docs]def complement(dataset, **kwargs): """ Takes one dataset and returns a new dataset representing the complement of the input This functions calls `filter` to populate the paths and returns them woth all the sources. However, you can pass any argument to `filter`, e.g. the `sources` argument """ out = deepcopy(dataset) out.paths = [] for i, d in enumerate(dataset.datasets): include_dataset = False for j, s in enumerate(d['songs']): if s['included']: out.datasets[i]['songs'][j]['included'] = False else: out.datasets[i]['songs'][j]['included'] = True include_dataset = True if include_dataset: # some song has been included out.datasets[i]['included'] = True else: out.datasets[i]['included'] = False # populate paths return filter(out, **kwargs)
[docs]def get_pedaling_mat(dataset, idx, frame_based=False, winlen=0.046, hop=0.01): """ Get data about pedaling Arguments --------- idx : int The index of the song to retrieve. frame_based : bool If True, the output will contain one row per frame, otherwise one row per control changes event. Frames are deduced from `winlen` and `hop`. winlen : float The duration of a frame in seconds; only used if `frame_based` is True. hop : float The amount of hop-size in seconds; only used if `frame_based` is True. Returns ------- list[np.ndarry] : list of 2d-arrays each listing all the control changes events in a track. Rows represent control changes or frames (according to `frame_based_option`) while columns represent (time, sustain value, sostenuto value, soft value). If `frame_based` is used, time is the central time of the frame and frames are computed using the most aligned score available for this item. If `frame_based` is False, value -1 is used for pedaling type not affected in a certain control change (i.e. a control change affects one type of pedaling, so the other two will have value -1). The output is sorted by time. """ pedaling = [] for gt in dataset.get_gts(idx): # take all cc... cc_track_pedaling = [] for pedal in ['sustain', 'sostenuto', 'soft']: L = len(gt[pedal]['values']) if pedal == 'sustain': cc_track_pedaling += list( zip(gt[pedal]['times'], gt[pedal]['values'], [-1] * L, [-1] * L)) elif pedal == 'sostenuto': cc_track_pedaling += list( zip(gt[pedal]['times'], [-1] * L, gt[pedal]['values'], [-1] * L)) elif pedal == 'soft': cc_track_pedaling += list( zip(gt[pedal]['times'], [-1] * L, [-1] * L, gt[pedal]['values'])) # sort cc according to time... cc_track_pedaling.sort(key=lambda row: row[0]) cc_track_pedaling = np.array(cc_track_pedaling) if not frame_based: pedaling.append(cc_track_pedaling) else: # construct the frame-based output # compute the number of frames dur = dataset.get_score_duration(idx) n_frames = int(utils.nframes(dur, hop, winlen)) + 1 # set up initial matrix that will be output frame_track_pedaling = np.zeros((n_frames, 4), dtype=float) # doesn't work because shape suffers from precisions problems # frame_track_pedaling[:, 0] = np.arange(winlen / 2, hop * # n_frames + winlen / 2, hop) frame_track_pedaling[:, 0] = np.arange(n_frames) * hop + winlen / 2 # fill the matrix # rember the last value used for each column index: last_values = { 1: { "time": 0, "value": 0 }, 2: { "time": 0, "value": 0 }, 3: { "time": 0, "value": 0 }, } # parse the control changes for cc in cc_track_pedaling: # compute the frame relative to this cc frame_idx = utils.time2frame(cc[0], hop, winlen) # put all values from last cc to this one equal to the last # value type_of_cc = np.argmax(cc[1:]) + 1 frame_track_pedaling[ last_values[type_of_cc]["time"]:frame_idx, type_of_cc] = last_values[type_of_cc]["value"] # update the last value last_values[type_of_cc]["time"] = frame_idx last_values[type_of_cc]["value"] = cc[type_of_cc] # put all values from last cc to the end equal to the last # value if len(cc_track_pedaling) > 0: for type_of_cc in range(1, 4): frame_track_pedaling[ last_values[type_of_cc]["time"]:, type_of_cc] = last_values[type_of_cc]["value"] pedaling.append(np.array(frame_track_pedaling)) return pedaling