Skip to content
Snippets Groups Projects
Commit 09ce71d2 authored by Blaß, Michael's avatar Blaß, Michael :speech_balloon:
Browse files

Histogram weight sample. And common interface.

parent 370c6e8f
Branches
Tags
No related merge requests found
...@@ -11,11 +11,10 @@ from apollon.io import io as aio ...@@ -11,11 +11,10 @@ from apollon.io import io as aio
from apollon.som import defaults as _defaults from apollon.som import defaults as _defaults
from . import neighbors as _neighbors from . import neighbors as _neighbors
from . import utilities as asu from . import utilities as asu
from .. types import Array, Shape, Coord from .. types import Array, Shape, SomDims, Coord
WeightInit = Union[Callable[[Array, Shape], Array], str] WeightInit = Union[Callable[[Array, Shape], Array], str]
Metric = Union[Callable[[Array, Array], float], str] Metric = Union[Callable[[Array, Array], float], str]
SomDims = Tuple[int, int, int]
class SomGrid: class SomGrid:
...@@ -320,7 +319,7 @@ class IncrementalMap(SomBase): ...@@ -320,7 +319,7 @@ class IncrementalMap(SomBase):
seed=seed) seed=seed)
def fit(self, train_data, verbose=False, output_weights=False): def fit(self, train_data, verbose=False, output_weights=False):
self._weights = self.init_weights(train_data, self.shape) self._weights = self.init_weights(self.dims, train_data)
eta_ = asu.decrease_linear(self.init_eta, self.n_iter, _defaults.final_eta) eta_ = asu.decrease_linear(self.init_eta, self.n_iter, _defaults.final_eta)
nhr_ = asu.decrease_expo(self.init_nhr, self.n_iter, _defaults.final_nhr) nhr_ = asu.decrease_expo(self.init_nhr, self.n_iter, _defaults.final_nhr)
......
...@@ -7,13 +7,13 @@ Copyright (C) 2019 Michael Blaß ...@@ -7,13 +7,13 @@ Copyright (C) 2019 Michael Blaß
mblass@posteo.net mblass@posteo.net
""" """
import itertools import itertools
from typing import Dict, Iterable, Iterator, List, Tuple from typing import Dict, Iterable, Iterator, List, Optional, Tuple
import numpy as np import numpy as np
from scipy.spatial import distance as _distance from scipy.spatial import distance as _distance
from scipy import stats as _stats from scipy import stats as _stats
from apollon.types import Array, Shape from apollon.types import Array, Shape, SomDims
from apollon import tools from apollon import tools
...@@ -115,13 +115,13 @@ def best_match(weights: Array, inp: Array, metric: str): ...@@ -115,13 +115,13 @@ def best_match(weights: Array, inp: Array, metric: str):
return dists.argmin(axis=0), dists.min(axis=0) return dists.argmin(axis=0), dists.min(axis=0)
def sample_pca(data: Array, shape: Shape, adapt: bool = True) -> Array: def sample_pca(dims: SomDims, data: Optional[Array] = None, **kwargs) -> Array:
"""Compute initial SOM weights by sampling from the first two principal """Compute initial SOM weights by sampling from the first two principal
components of the input data. components of the input data.
Args: Args:
dims: Dimensions of SOM.
data: Input data set. data: Input data set.
shape: Shape of SOM.
adapt: If ``True``, the largest value of ``shape`` is applied to the adapt: If ``True``, the largest value of ``shape`` is applied to the
principal component with the largest sigular value. This principal component with the largest sigular value. This
orients the map, such that map dimension with the most units orients the map, such that map dimension with the most units
...@@ -130,36 +130,45 @@ def sample_pca(data: Array, shape: Shape, adapt: bool = True) -> Array: ...@@ -130,36 +130,45 @@ def sample_pca(data: Array, shape: Shape, adapt: bool = True) -> Array:
Returns: Returns:
Array of SOM weights. Array of SOM weights.
""" """
n_rows, n_cols, n_feats = dims
n_units = n_rows * n_cols
if data is None:
data = np.random.randint(-100, 100, (300, n_feats))
vals, vects, trans_data = tools.pca(data, 2) vals, vects, trans_data = tools.pca(data, 2)
data_limits = np.column_stack((trans_data.min(axis=0), data_limits = np.column_stack((trans_data.min(axis=0),
trans_data.max(axis=0))) trans_data.max(axis=0)))
if adapt: if 'adapt' in kwargs and kwargs['adapt'] is True:
shape = sorted(shape, reverse=True) shape = sorted(shape, reverse=True)
dim_x = np.linspace(*data_limits[0], shape[0]) dim_x = np.linspace(*data_limits[0], n_rows)
dim_y = np.linspace(*data_limits[1], shape[1]) dim_y = np.linspace(*data_limits[1], n_cols)
grid_x, grid_y = np.meshgrid(dim_x, dim_y) grid_x, grid_y = np.meshgrid(dim_x, dim_y)
points = np.vstack((grid_x.ravel(), grid_y.ravel())) points = np.vstack((grid_x.ravel(), grid_y.ravel()))
weights = points.T @ vects + data.mean(axis=0) weights = points.T @ vects + data.mean(axis=0)
return weights return weights
def sample_rnd(data: Array, shape: Shape) -> Array: def sample_rnd(dims: SomDims, data: Optional[Array] = None, **kwargs) -> Array:
"""Compute initial SOM weights by sampling uniformly from the data space. """Compute initial SOM weights by sampling uniformly from the data space.
Args: Args:
data: Input data set dims: Dimensions of SOM.
shape: Shape of SOM. data: Input data set. If ``None``, sample from [-10, 10].
Returns: Returns:
Array of SOM weights. Array of SOM weights.
""" """
n_units = np.prod(shape) n_rows, n_cols, n_feats = dims
data_limits = np.column_stack((data.max(axis=0), data.min(axis=0))) n_units = n_rows * n_cols
if data is not None:
data_limits = np.column_stack((data.min(axis=0), data.max(axis=0)))
else:
data_limits = np.random.randint(-10, 10, (n_feats, 2))
data_limits.sort()
weights = [np.random.uniform(*lim, n_units) for lim in data_limits] weights = [np.random.uniform(*lim, n_units) for lim in data_limits]
return np.column_stack(weights) return np.column_stack(weights)
def sample_stm(data: Array, shape: Shape): def sample_stm(dims: SomDims, data: Optional[Array] = None, **kwargs) -> Array:
"""Compute initial SOM weights by sampling stochastic matrices from """Compute initial SOM weights by sampling stochastic matrices from
Dirichlet distribution. Dirichlet distribution.
...@@ -170,8 +179,8 @@ def sample_stm(data: Array, shape: Shape): ...@@ -170,8 +179,8 @@ def sample_stm(data: Array, shape: Shape):
The square root of the weight vectors' size must be a real integer. The square root of the weight vectors' size must be a real integer.
Args: Args:
dims: Dimensions of SOM.
data: Input data set. data: Input data set.
shape: Shape of SOM.
Returns: Returns:
Array of SOM weights. Array of SOM weights.
...@@ -182,20 +191,35 @@ def sample_stm(data: Array, shape: Shape): ...@@ -182,20 +191,35 @@ def sample_stm(data: Array, shape: Shape):
are a discrete probability distribution forming the ``N``th row of are a discrete probability distribution forming the ``N``th row of
the matrix. the matrix.
""" """
n_rows = np.sqrt(data.shape[1]) n_rows, n_cols, n_feats = dims
if bool(n_rows - int(n_rows)): n_states = np.sqrt(n_feats)
msg = (f'Weight vector with {n_rows} elements is not ' if bool(n_states - int(n_states)):
msg = (f'Weight vector with {n_feats} elements is not '
'reshapeable to square matrix.') 'reshapeable to square matrix.')
raise ValueError(msg) raise ValueError(msg)
n_rows = int(n_rows) n_states = int(n_states)
n_units = np.prod(shape) n_units = n_rows * n_cols
alpha = np.random.randint(1, 10, (n_rows, n_rows)) alpha = np.random.randint(1, 10, (n_states, n_states))
st_matrix = np.hstack([_stats.dirichlet.rvs(alpha=a, size=n_units) st_matrix = np.hstack([_stats.dirichlet(a).rvs(size=n_units)
for a in alpha]) for a in alpha])
return st_matrix return st_matrix
def sample_hist(dims: SomDims, data: Optional[Array] = None, **kwargs) -> Array:
"""Sample sum-normalized histograms.
Args:
dims: Dimensions of SOM.
data: Input data set.
Returns:
Two-dimensional array in which each row is a historgram.
"""
n_rows, n_cols, n_feats = dims
return _stats.dirichlet(np.ones(n_feats)).rvs(n_rows*n_cols)
def distribute(bmu_idx: Iterable[int], n_units: int def distribute(bmu_idx: Iterable[int], n_units: int
) -> Dict[int, List[int]]: ) -> Dict[int, List[int]]:
"""List training data matches per SOM unit. """List training data matches per SOM unit.
...@@ -223,4 +247,5 @@ def distribute(bmu_idx: Iterable[int], n_units: int ...@@ -223,4 +247,5 @@ def distribute(bmu_idx: Iterable[int], n_units: int
weight_initializer = { weight_initializer = {
'rnd': sample_rnd, 'rnd': sample_rnd,
'stm': sample_stm, 'stm': sample_stm,
'pca': sample_pca,} 'pca': sample_pca,
'hist': sample_hist}
...@@ -20,6 +20,7 @@ PathGen = Generator[PathType, None, None] ...@@ -20,6 +20,7 @@ PathGen = Generator[PathType, None, None]
Schema = Dict[str, Collection[str]] Schema = Dict[str, Collection[str]]
Shape = Tuple[int, int] Shape = Tuple[int, int]
SomDims = Tuple[int, int, int]
Coord = Tuple[int, int] Coord = Tuple[int, int]
AdIndex = Tuple[List[int], List[int]] AdIndex = Tuple[List[int], List[int]]
......
import unittest import unittest
from hypothesis import strategies as hst from hypothesis import strategies as hst
from hypothesis import given
import numpy as np import numpy as np
from scipy.spatial import distance from scipy.spatial import distance
import scipy as sp
from apollon.som import utilities as asu from apollon.som import utilities as asu
from apollon.types import SomDims
dimension = hst.integers(min_value=2, max_value=50)
som_dims = hst.tuples(dimension, dimension, dimension)
""" """
class TestMatch(unittest.TestCase): class TestMatch(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
...@@ -39,6 +42,24 @@ class TestDistribute(unittest.TestCase): ...@@ -39,6 +42,24 @@ class TestDistribute(unittest.TestCase):
self.assertIsInstance(res, dict) self.assertIsInstance(res, dict)
class TestSampleHist(unittest.TestCase):
def setUp(self) -> None:
pass
@given(som_dims)
def test_rows_are_stochastic(self, dims: SomDims) -> None:
weights = asu.sample_hist(dims)
comp =np.isclose(weights.sum(axis=1), 1)
self.assertTrue(comp.all())
class TestSamplePca(unittest.TestCase):
def setUp(self) -> None:
pass
@given(som_dims)
def test_x(self, dims: SomDims) -> None:
weights = asu.sample_pca(dims)
""" """
class TestSelfOrganizingMap(unittest.TestCase): class TestSelfOrganizingMap(unittest.TestCase):
def setUp(self): def setUp(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment