diff --git a/comsar/__init__.py b/comsar/__init__.py index ab321b99fc8a2d0e0ab3c69d6297ff5fa7cb1532..79a59e2dde46332565fb36682c1412e3830e7c0a 100644 --- a/comsar/__init__.py +++ b/comsar/__init__.py @@ -1 +1,6 @@ -from . track import TimbreTrack +import pkg_resources as _pkg + +from comsar.tracks.timbre import TimbreTrack +from comsar.tracks.utilities import TrackResult + +__version__ = _pkg.get_distribution('comsar').version diff --git a/comsar/cli/apollon_features.py b/comsar/cli/comsar_features.py similarity index 60% rename from comsar/cli/apollon_features.py rename to comsar/cli/comsar_features.py index fe6195b2110069b4a7717686e9eabb61e7bd2035..69cd00296be34917b50ec47f53340ebbbb98b46f 100644 --- a/comsar/cli/apollon_features.py +++ b/comsar/cli/comsar_features.py @@ -2,43 +2,19 @@ # Copyright (C) 2019 Michael Blaß # mblass@posteo.net - import argparse import itertools import json import sys import typing -import soundfile as sf import logging import time -from .. import analyses -from .. types import PathType -from .. import io -from .. audio import load_audio - -class ShortPiece(Exception): - pass - -class BadSampleRate(Exception): - pass - +from apollon import io -def _check_audio(path): - snd_info = sf.info(path) - if snd_info.duration < 30: - logging.error('Piece to short: {}'.format(path)) - # raise ShortPiece('Duration of {} is less than {} s.'.format(path, 30)) - return 10 - - if snd_info.samplerate != 44100: - logging.error('Bad sample rate: {}'.format(path)) - # raise BadSampleRate('Sample rate of {} Hz cannot be processed'.format(snd_info.samplerate)) - return 10 - return 0 def main(argv: dict = None) -> int: - logging.basicConfig(filename='fe.log', filemode='w', level=logging.DEBUG) + logging.basicConfig(filename='tt.log', filemode='w', level=logging.DEBUG) if argv is None: argv = sys.argv @@ -51,9 +27,6 @@ def main(argv: dict = None) -> int: if _check_audio(path) != 0: return 10 - snd = load_audio(path) - snd.cut(snd.fps*2, snd.size-(snd.fps*5)) - track_data = {} if argv.rhythm: track_data['rhythm'] = analyses.rhythm_track(snd) diff --git a/comsar/tracks/timbre.py b/comsar/tracks/timbre.py index 13f73ebccdc988a1265784d170e99c2b3c3114bb..14e173ba89b0f3255f57efa1d300ef23977a9515 100644 --- a/comsar/tracks/timbre.py +++ b/comsar/tracks/timbre.py @@ -1,102 +1,115 @@ +"""comsar/tracks/timbre.py -- TimbreTack implementation +License: BSD-3-Clasuse +Copyright (C) 2020, Michael Blaß, michael.blass@uni-hamburg.de +""" from dataclasses import dataclass -import pathlib from timeit import default_timer as timer +from typing import Optional import numpy as np import pandas as pd from apollon.audio import AudioFile -from apollon.segment import Segmentation, Segments -from apollon.signal.container import STParams +from apollon.segment import Segmentation +from apollon.signal import container, features from apollon.signal.spectral import StftSegments -from apollon.signal import features -from apollon.tools import standardize +from apollon.tools import time_stamp +import comsar +from comsar.tracks.utilities import TrackMeta, TrackParams, TrackResult, TimbreTrackParams + + +STFT_DEFAULT = container.StftParams(fps=44100, window='hamming', n_fft=None, + n_perseg=2**15, n_overlap=2**14, + extend=True, pad=True) + +CORR_DIM_DEFAULT = container.CorrDimParams(delay=14, m_dim=80, n_bins=1000, + scaling_size=10) + +CORR_GRAM_DEFAULT = container.CorrGramParams(wlen=2**10, n_delay=2**8, total=True) -segment_default = {'n_perseg': 2**15, 'n_overlap': 2**14, 'extend': True, 'pad': True} -cdim_default = {'delay': 14, 'm_dim': 80, 'n_bins': 1000, 'scaling_size': 10} -crr_default = {'wlen': 2**9, 'n_delay': 2**10, 'total': True} -@dataclass -class TTParams: - segment: dict - cdim: dict - crr: dict class TimbreTrack: """Compute timbre track of an audio file. """ - def __init__(self, path, segment_params: dict = segment_default, - cdim_params: dict = cdim_default, - crr_params: dict = crr_default) -> None: + def __init__(self, + stft_params: Optional[container.StftParams] = None, + corr_dim_params: Optional[container.CorrDimParams] = None, + corr_gram_params: Optional[container.CorrGramParams] = None) -> None: """ Args: - path: Path to audio file. - params: Feature computation parameters. """ - self.params = TTParams(segment_params, cdim_params, crr_params) - self.path = pathlib.Path(path) - snd = AudioFile(path) - cutter = Segmentation(**self.params.segment) - self.segments = cutter.transform(snd.data.squeeze()) - stp = STParams(snd.fps) - stft = StftSegments(stp) - self.spectrogram = stft.transform(self.segments) + self.params = TimbreTrackParams(stft_params or STFT_DEFAULT, + corr_dim_params or CORR_DIM_DEFAULT, + corr_gram_params or CORR_GRAM_DEFAULT) + + self.cutter = Segmentation(self.params.stft.n_perseg, + self.params.stft.n_overlap, + self.params.stft.extend, + self.params.stft.pad) + self.stft = StftSegments(self.params.stft.fps, self.params.stft.window, + self.params.stft.n_fft) + self.feature_names = ('Spectral Centroid', 'Spectral Spread', 'Spectral Flux', 'Roughness', 'Sharpness', 'SPL', 'Correlation Dimension', 'Correlogram') - self.funcs = [features.spectral_centroid, features.spectral_spread, - features.spectral_flux, features.roughness_helmholtz, - features.sharpness, features.spl, features.cdim, - features.correlogram] - assert len(self.feature_names) == len(self.funcs) + self.funcs = [features.spectral_centroid, + features.spectral_spread, + features.spectral_flux, + features.roughness_helmholtz, + features.sharpness, + features.spl, + features.cdim, + features.correlogram] - self._features = np.zeros((self.segments.n_segs, self.n_features)) self.pace = np.zeros(self.n_features) self.verbose = False - snd.close() @property def n_features(self) -> int: + """Number of features on track""" return len(self.feature_names) - @property - def features(self) pd.DataFrame: - if self._features is None: - return None - return pd.DataFrame(data=self._features, - columns=self.feature_names) - - @property - def z_score(self) -> pd.Dataframe: - if self._features is None: - return None - return standardize(self.features) - - - def extract(self) -> None: + def extract(self, path) -> pd.DataFrame: """Perform extraction. """ - args = [(self.spectrogram.frqs, self.spectrogram.power), - (self.spectrogram.frqs, self.spectrogram.power), - (self.spectrogram.abs,), - (self.spectrogram.d_frq, self.spectrogram.abs, 15000), - (self.spectrogram.frqs, self.spectrogram.abs), - (self.segments._segs,), - (self.segments._segs,), - (self.segments._segs,)] - - kwargs = [{}, {}, {}, {}, {}, {}, self.params.cdim, - self.params.crr] + snd = AudioFile(path) + if snd.fps != self.params.stft.fps: + snd.close() + raise ValueError('Sample rate of {snd!str} differs from init.') + + segs = self.cutter.transform(snd.data.squeeze()) + sxx = self.stft.transform(segs) + + args = [(sxx.frqs, sxx.power), + (sxx.frqs, sxx.power), + (sxx.abs,), + (sxx.d_frq, sxx.abs, 15000), + (sxx.frqs, sxx.abs), + (segs.data,), + (segs.data,), + (segs.data,)] + + kwargs = [{}, {}, {}, {}, {}, {}, self.params.corr_dim.to_dict(), + self.params.corr_gram.to_dict()] + + out = np.zeros((segs.n_segs, self.n_features)) for i, (fun, arg, kwarg) in enumerate(zip(self.funcs, args, kwargs)): - self._worker(i, fun, arg, kwarg) + out[:, i] = self._worker(i, fun, arg, kwarg) + snd.close() + + meta = TrackMeta(comsar.__version__, time_stamp(), snd.file_name) + out = pd.DataFrame(data=out, columns=self.feature_names) + return TrackResult(meta, self.params, out) - def _worker(self, idx, func, args, kwargs) -> None: + def _worker(self, idx, func, args, kwargs) -> np.ndarray: print(self.feature_names[idx], end=' ... ') pace = timer() - self._features[:, idx] = func(*args, **kwargs) + res = func(*args, **kwargs) pace = timer() - pace self.pace[idx] = pace print(f'{pace:.4} s.') + return res diff --git a/comsar/tracks/utilities.py b/comsar/tracks/utilities.py new file mode 100644 index 0000000000000000000000000000000000000000..549acbfe4f438eb3564b23c6c7ba984be62ef864 --- /dev/null +++ b/comsar/tracks/utilities.py @@ -0,0 +1,114 @@ +"""comsar/tracks/untilities.py -- Utilities +License: BSD-3-Clasuse +Copyright (C) 2020, Michael Blaß, michael.blass@uni-hamburg.de +""" +import pathlib +import pickle +from typing import ClassVar, Type, TypeVar, Union + +from dataclasses import dataclass +import numpy as np +import pandas as pd + +from apollon import io +from apollon import container +from apollon import signal +from apollon.tools import standardize +from apollon import types + + +T = TypeVar('T') + +@dataclass +class TrackMeta(container.Params): + """Track meta data.""" + _schema: ClassVar[types.Schema] = None + version: str + time_stamp: str + source: str + + +@dataclass +class TrackParams(container.Params): + """Track parameter base class.""" + _schema: ClassVar[types.Schema] = None + + +@dataclass +class TimbreTrackParams(TrackParams): + """Parameter set for TimbreTrack""" + stft: signal.container.StftParams + corr_dim: signal.container.CorrDimParams + corr_gram: signal.container.CorrGramParams + + +class TrackResult: + """Provide track results.""" + def __init__(self, meta: TrackMeta, params: TrackParams, + data: pd.DataFrame) -> None: + self._meta = meta + self._params = params + self._data = data + + @property + def data(self) -> np.ndarray: + """Return the raw data array.""" + return self._data.to_numpy() + + @property + def features(self) -> pd.DataFrame: + """Extracted feautures.""" + return self._data + + @property + def features_names(self) -> list: + """Name of each feature.""" + return self._data.columns.to_list() + + @property + def z_score(self) -> pd.DataFrame: + """Z-score of extracted features.""" + return standardize(self.features) + + def to_csv(self, path: Union[str, pathlib.Path]) -> None: + """Serialize features to csv file. + + This does not save parameters, and meta data. + + Args: + path: Destination path. + """ + self._data.to_csv(path) + + def to_dict(self) -> dict: + """Serialize TrackResults to dictionary.""" + return {'meta': self._meta.to_dict(), + 'params': self._params.to_dict(), + 'data': self._data.to_dict()} + + def to_json(self, path: Union[str, pathlib.Path]) -> None: + """Serialize TrackResults to JSON.""" + io.json.dump(self.to_dict(), path) + + + def to_pickle(self, path: Union[str, pathlib.Path]) -> None: + """Serialize Track Results to pickle.""" + path = pathlib.Path(path) + with path.open('wb') as fobj: + pickle.dump(self, fobj) + + @classmethod + def read_json(cls: Type[T], path: Union[str, pathlib.Path]) -> T: + """Read TrackResults form json.""" + raw = io.json.load(path) + meta = TrackMeta.from_dict(raw['meta']) + params = TimbreTrackParams.from_dict(raw['params']) + data = pd.DataFrame(raw['data']) + return cls(meta, params, data) + + @classmethod + def read_pickle(cls: Type[T], path: Union[str, pathlib.Path]) -> T: + """Read pickled TrackResults.""" + path = pathlib.Path(path) + with path.open('rb') as fobj: + return pickle.load(fobj) diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000000000000000000000000000000000000..e906c6b29b8a0d52219e3591aaf3b226fa2c4d1a --- /dev/null +++ b/mypy.ini @@ -0,0 +1,17 @@ +[mypy] +python_version = 3.7 +warn_return_any = True +warn_unused_configs = True + + +[mypy-hypothesis,hypothesis.*] +ignore_missing_imports = True + +[mypy-numpy] +ignore_missing_imports = True + +[mypy-pandas] +ignore_missing_imports = True + +[mypy-apollon.*] +ignore_missing_imports = True diff --git a/scripts/comsar b/scripts/comsar index 8fa932fa3c61ad18316ae5d0463e5c51dfa5e702..ac8544eb91ebc2f7600eb5e4f23b5bdf9e17d4b7 100755 --- a/scripts/comsar +++ b/scripts/comsar @@ -9,27 +9,34 @@ import sys import argparse -import apollon -from apollon import commands +from comsar import cli -_valid_subcommand = ('features', 'onsets', 'hmm', 'som', 'export', 'position') +_valid_subcommand = ('timbre', 'hmm', 'som', 'position') def _parse_cml(argv): parser = argparse.ArgumentParser('Computational Music and Sound Archiving') parser.add_argument( - '--version', action='version', version=apollon.__version__, - help='Display apollon version.') + '--version', action='version', version=comsar.__version__, + help='Display COMSAR version.') subparsers = parser.add_subparsers() + sp_tt = _create_subparser_timbretrack() + """ sp_features = _create_subparser_features(subparsers) sp_hmm = _create_subparser_hmm(subparsers) sp_position = _create_subparser_position(subparsers) - + """ return parser.parse_args(argv[1:]) +def _create_subparser_timbretrack(subparsers): + sp_tt = subparser.add_parser('timbre', help='Start timbre track') + sp_features.add_argument('files', type=str, nargs='+', help='Auio files.') + sp_features.set_defaults(func=cli.comsar_timbre_track.main) + +""" def _create_subparser_features(subparsers): sp_features = subparsers.add_parser('features', help='') sp_features.add_argument( @@ -104,15 +111,12 @@ def _create_subparser_position(subparsers): sp_position.set_defaults(func=commands.apollon_position.main) return sp_position - - +""" def main(argv=None): if argv is None: argv = sys.argv - args = _parse_cml(argv) return args.func(args) - if __name__ == '__main__': sys.exit(main()) diff --git a/tests/tracks/test_timbre.py b/tests/tracks/test_timbre.py new file mode 100644 index 0000000000000000000000000000000000000000..4287cb6858eca9fa840d82eba02c76264b2c5515 --- /dev/null +++ b/tests/tracks/test_timbre.py @@ -0,0 +1,19 @@ +import unittest +import numpy as np +import pandas as pd + +from apollon.tools import time_stamp +from comsar.tracks.timbre import (TimbreTrack, + STFT_DEFAULT, CORR_DIM_DEFAULT, CORR_GRAM_DEFAULT) +import comsar +from comsar.tracks.utilities import TrackMeta, TrackParams, TrackResult, TimbreTrackParams + + +class TestTimbreTrack(unittest.TestCase): + def setUp(self): + self.track = TimbreTrack() + + def test_nfeatures(self): + self.assertIsInstance(self.track.n_features, int) + + diff --git a/tests/tracks/test_utilities.py b/tests/tracks/test_utilities.py new file mode 100644 index 0000000000000000000000000000000000000000..5e85e29e3e0e6c80666313f07b4407afdf54741c --- /dev/null +++ b/tests/tracks/test_utilities.py @@ -0,0 +1,31 @@ +import os +import tempfile +import unittest + +from hypothesis import given + +from comsar import TrackResult +from comsar.tracks.timbre import TimbreTrackParams +from .. utils import timbre_track_results + + +class TestTrackResult(unittest.TestCase): + def setUp(self) -> None: + self.tf_descr, self.tf_name = tempfile.mkstemp(suffix='.json', + text=True) + + @given(timbre_track_results()) + def test_init(self, ttr) -> None: + self.assertIsInstance(ttr, TrackResult) + + @given(timbre_track_results()) + def test_to_dict(self, ttr) -> None: + self.assertIsInstance(ttr.to_dict(), dict) + + @given(timbre_track_results()) + def test_to_json(self, ttr) -> None: + ttr.to_json(self.tf_name) + + def tearDown(self) -> None: + os.unlink(self.tf_name) + os.close(self.tf_descr) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..cd2c3b4c42359d6e4221013f493422c5ac5de9b7 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,42 @@ +import string + +from hypothesis.strategies import (composite, integers, lists, sampled_from, + text, SearchStrategy) +from hypothesis.extra.numpy import arrays +import pandas as pd +from apollon.tools import time_stamp + +import comsar +from comsar.tracks import timbre +from comsar.tracks.utilities import TrackMeta, TimbreTrackParams, TrackResult + + +def ascii_strings() -> SearchStrategy: + """Strings made of ASCII letters and digits.""" + return text(sampled_from(string.ascii_letters+string.digits), + min_size=2, max_size=10) + +def lists_of_strings() -> SearchStrategy: + """Lists of unique ascii_strings.""" + return lists(ascii_strings(), min_size=2, max_size=10, unique=True) + +@composite +def numerical_dataframes(draw) -> pd.DataFrame: + """Generate pandas DataFrames. + + Each column is of type np.float64. Shape vaies between + (1, 2) and (1000, 10). + """ + names = draw(lists_of_strings()) + n_rows = draw(integers(min_value=1, max_value=1000)) + data = draw(arrays('float64', (n_rows, len(names)))) + return pd.DataFrame(data=data, columns=names) + +@composite +def timbre_track_results(draw) -> TrackResult: + meta = TrackMeta(comsar.__version__, time_stamp(), 'testfile.wav') + params = TimbreTrackParams(timbre.STFT_DEFAULT, + timbre.CORR_DIM_DEFAULT, + timbre.CORR_GRAM_DEFAULT) + data = draw(numerical_dataframes()) + return TrackResult(meta, params, data)