diff --git a/src/apollon/aplot.py b/src/apollon/aplot.py
index 278c89dd2a9be67505770d7713fd5214149070ba..e91664914c4128fe6afd42c02e55b8ffc3f2813e 100644
--- a/src/apollon/aplot.py
+++ b/src/apollon/aplot.py
@@ -1,9 +1,10 @@
-# Licensed under the terms of the BSD-3-Clause license.
-# Copyright (C) 2019 Michael Blaß
-# mblass@posteo.net
+"""apollon/aplot.py
-"""
-aplot.py -- General plotting routines.
+General plotting routines.
+
+Licensed under the terms of the BSD-3-Clause license.
+Copyright (C) 2019 Michael Blaß
+mblass@posteo.net
Functions:
fourplot Create a four plot of time a signal.
@@ -12,8 +13,7 @@ Functions:
onest_decoding Plot decoded onsets over a signal.
signal Plot a time domain signal.
"""
-
-from typing import Optional, Tuple
+from typing import Iterable, Optional, Tuple, Union
import matplotlib.pyplot as _plt
import matplotlib.cm as _cm
@@ -22,32 +22,52 @@ from scipy import stats as _stats
from . import _defaults
from . import tools as _tools
-from . types import Array as _Array
+from . types import Array as _Array, Axis
Limits = Optional[Tuple[int, int]]
MplFig = Optional[_plt.Figure]
FigSize = Tuple[float, float]
SubplotPos = Optional[Tuple[int, int, int]]
+Axes = Union[Axis, Iterable[Axis]]
-def _nice_spines(ax, offset: int = 10) -> None:
+def outward_spines(axs: Axes, offset: float = 10.0) -> None:
"""Display only left and bottom spine and displace them.
+ Args:
+ axs: Axis or iterable of axes.
+ offset: Move the spines ``offset`` pixels in the negative direction.
+
Note:
Increasing ``offset`` may breaks the layout. Since the spine is moved,
- so is the axis label, which is in turn forced out of the figure's bounds.
+ so is the axis label, which is in turn forced out of the figure's
+ bounds.
+ """
+ for ax in _np.atleast_1d(axs).ravel():
+ ax.spines['left'].set_position(('outward', offset))
+ ax.spines['bottom'].set_position(('outward', offset))
+ ax.spines['top'].set_visible(False)
+ ax.spines['right'].set_visible(False)
+ ax.xaxis.set_ticks_position('bottom')
+ ax.yaxis.set_ticks_position('left')
+
+
+def center_spines(axs: Axes,
+ intersect: Tuple[float, float] = (0.0, 0.0)) -> None:
+ """Display axes in crosshair fashion.
Args:
- ax: Axes to be modified.
- offset: Move the spines ``offset`` pixels in the negative direction.
+ axs: Axis or iterable of axes.
+ intersect: Coordinate of axes' intersection point.
"""
- ax.spines['left'].set_position(('outward', offset))
- ax.spines['bottom'].set_position(('outward', offset))
- ax.spines['top'].set_visible(False)
- ax.spines['right'].set_visible(False)
- ax.xaxis.set_ticks_position('bottom')
- ax.yaxis.set_ticks_position('left')
+ for ax in _np.atleast_1d(axs).ravel():
+ ax.spines['left'].set_position(('axes', intersect[0]))
+ ax.spines['bottom'].set_position(('axes', intersect[1]))
+ ax.spines['top'].set_visible(False)
+ ax.spines['right'].set_visible(False)
+ ax.xaxis.set_ticks_position('bottom')
+ ax.yaxis.set_ticks_position('left')
def _new_axis(spines: str = 'nice', fig: MplFig = None, sp_pos: SubplotPos = None,
diff --git a/src/apollon/hmm/poisson/poisson_hmm.py b/src/apollon/hmm/poisson_hmm.py
similarity index 99%
rename from src/apollon/hmm/poisson/poisson_hmm.py
rename to src/apollon/hmm/poisson_hmm.py
index 975f83929b6dd0b05e42fb8355a739f00a3316a4..bf609ca15cbb0406674d2efb15ccf4cd450fbbe6 100644
--- a/src/apollon/hmm/poisson/poisson_hmm.py
+++ b/src/apollon/hmm/poisson_hmm.py
@@ -20,7 +20,7 @@ import warnings as _warnings
import numpy as _np
-import chains_addiction as _ca
+import chainsaddiction as _ca
import apollon
from apollon import types as _at
diff --git a/src/apollon/io/io.py b/src/apollon/io/io.py
index 7e273a5f1fe66ee758a90dca43c67afeef9b582b..6623b6be97f600d8cd8dd70b02ed8286644d9fd6 100644
--- a/src/apollon/io/io.py
+++ b/src/apollon/io/io.py
@@ -21,7 +21,7 @@ from typing import Any, Optional
import numpy as np
-from .. types import PathType
+from .. types import Array, PathType
from . json import ArrayEncoder
def generate_outpath(in_path: PathType,
@@ -177,8 +177,36 @@ def save_to_pickle(data: Any, path: PathType) -> None:
Args:
data: Pickleable object.
- path: Path to safe the file.
+ path: Path to save the file.
"""
path = pathlib.Path(path)
with path.open('wb') as file:
pickle.dump(data, file)
+
+
+def save_to_npy(data: Array, path: PathType) -> None:
+ """Save an array to numpy binary format without using pickle.
+
+ Args:
+ data: Numpy array.
+ path: Path to save the file.
+ """
+ path = pathlib.Path(path)
+ with path.open('wb') as file:
+ np.save(file, data, allow_pickle=False)
+
+
+def load_from_npy(path: PathType) -> Array:
+ """Load data from numpy's binary format.
+
+ Args:
+ path: File path.
+
+ Returns:
+ Data as numpy array.
+ """
+ path = pathlib.Path(path)
+ with path.open('rb') as file:
+ data = np.load(file, allow_pickle=False)
+ return data
+
diff --git a/src/apollon/som/datasets.py b/src/apollon/som/datasets.py
new file mode 100644
index 0000000000000000000000000000000000000000..c0c194a8d1ca503ab00b74608ce4e23345d41a79
--- /dev/null
+++ b/src/apollon/som/datasets.py
@@ -0,0 +1,42 @@
+"""apollon/som/datasets.py
+
+Licensed under the terms of the BSD-3-Clause license.
+Copyright (C) 2019 Michael Blaß
+mblass@posteo.net
+
+Function for generating test and illustration data sets.
+"""
+from typing import Optional, Tuple
+
+import numpy as np
+from scipy import stats
+
+def norm_circle(n_classes: int, n_per_class: int, class_std: int,
+ center: Tuple[int, int] = (0, 0), radius: int = 5,
+ seed: Optional[int] = None):
+ """Generate ``n_per_class`` samples from ``n_classes`` bivariate normal
+ distributions, each with standard deviation ``class_std``. The means
+ are equidistantly placed on a circle with radius ``radius``.
+
+ Args:
+ n_classes: Number of classes.
+ n_per_class: Number of samples in each class.
+ class_std: Standard deviation for every class.
+ center: Center of ther circle.
+ radius: Radius of the circle on which the means are placed.
+ seed: Set the random seed.
+
+ Returns:
+ Data set and target vector.
+ """
+ n_samples = n_classes * n_per_class
+ ang = np.pi * np.linspace(0, 360, n_classes, endpoint=False) / 180
+ xy_pos = np.stack((np.sin(ang), np.cos(ang)), axis=1)
+ xy_pos *= radius + np.asarray(center)
+
+ out = np.empty((n_samples, 2))
+ for i, pos in enumerate(xy_pos):
+ idx = slice(i*n_per_class, (i+1)*n_per_class)
+ distr = stats.multivariate_normal(pos, np.sqrt(class_std), seed=seed)
+ out[idx, :] = distr.rvs(n_per_class)
+ return out, np.arange(n_samples) // n_per_class
diff --git a/src/apollon/som/neighbors.py b/src/apollon/som/neighbors.py
index 631c600b92c671f09cd5fe1be1c4ce30e4cc83a8..2278de43555dd8e46769d6a1ac11525c293a49ad 100644
--- a/src/apollon/som/neighbors.py
+++ b/src/apollon/som/neighbors.py
@@ -9,10 +9,17 @@ Neighborhood computations
Functions:
gaussian N-Dimensional Gausian neighborhood.
"""
+from typing import List, Tuple
import numpy as np
from scipy.spatial import distance
+from .. types import Array
+
+Shape = Tuple[int, int]
+Coord = Tuple[int, int]
+AdIndex = Tuple[List[int], List[int]]
+
def gaussian(grid, center, radius):
"""Compute n-dimensional Gaussian neighbourhood.
@@ -62,6 +69,22 @@ def star(grid, center, radius):
return (dists <= radius).astype(int).T
+def neighborhood(grid, metric='sqeuclidean'):
+ """Compute n-dimensional cityblock neighborhood.
+
+ The cityblock neighborhood is a star-shaped area
+ around ``center``.
+
+ Params:
+ grid: Array of n-dimensional indices.
+ metric: Distance metric.
+
+ Returns:
+ Pairwise distances of map units.
+ """
+ return distance.squareform(distance.pdist(grid, metric))
+
+
def rect(grid, center, radius):
"""Compute n-dimensional Chebychev neighborhood.
@@ -79,3 +102,57 @@ def rect(grid, center, radius):
center = np.atleast_2d(center)
dists = distance.cdist(center, grid, 'chebychev')
return (dists <= radius).astype(int).T
+
+""" NNNNNNNNNEEEEEEEEEEWWWWWW STUFFFFFFFF """
+def gauss_kern(nhb, r):
+ return np.exp(-nhb/(r))
+
+
+def is_neighbour(cra: Array, crb: Array, grid: Array, metric: str) -> Array:
+ """Compute neighbourship between each coordinate in ``units_a`` abd
+ ``units_b`` on ``grid``.
+
+ Args:
+ cra: (n x 2) array of grid coordinates.
+ crb: (n x 2) array of grid coordinates.
+ grid: SOM grid array.
+ metric: Name of distance metric function.
+
+ Returns:
+ One-dimensional boolean array. ``True`` in position n means that the
+ points ``cra[n]`` and ``crb[n]`` are direct neighbours on ``grid``
+ regarding ``metric``.
+ """
+
+
+def check_bounds(shape: Shape, point: Coord) -> bool:
+ """Return ``True`` if ``point`` is valid index in ``shape``.
+
+ Args:
+ shape: Shape of two-dimensional array.
+ point: Two-dimensional coordinate.
+
+ Return:
+ True if ``point`` is within ``shape`` else ``False``.
+ """
+ return (0 <= point[0] < shape[0]) and (0 <= point[1] < shape[1])
+
+
+def direct_rect_nb(shape: Shape, point: Coord) -> AdIndex:
+ """Return the set of direct neighbours of ``point`` given rectangular
+ topology.
+
+ Args:
+ shape: Shape of two-dimensional array.
+ point: Two-dimensional coordinate.
+
+ Returns:
+ Advanced index of points in neighbourhood set.
+ """
+ nhb = []
+ for i in range(point[0]-1, point[0]+2):
+ for j in range(point[1]-1, point[1]+2):
+ if check_bounds(shape, (i, j)):
+ nhb.append((i, j))
+ return np.asarray(nhb)
+
diff --git a/src/apollon/som/plot.py b/src/apollon/som/plot.py
index d6673eabf12f9b3963016a35abdede94e043be8f..6b792106a3ab5ad73b4b13c7f6080acb5de70b72 100644
--- a/src/apollon/som/plot.py
+++ b/src/apollon/som/plot.py
@@ -4,13 +4,37 @@
"""apollon/som/plot.py
"""
-from typing import Tuple
+from typing import Optional, Tuple, Union
+import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
from apollon import tools
-from apollon.types import Array
+from apollon import aplot
+from apollon.types import Array, Axis, Shape
+
+
+def umatrix(ax: Axis, umx: Array, outline: bool = False, **kwargs) -> None:
+ """Plot the U-matrix.
+
+ Args:
+ ax: Axis subplot.
+ umx: U-matrix data.
+
+ Returns:
+ Image.
+ """
+ defaults = {
+ 'cmap': 'terrain',
+ 'levels': 20}
+
+ for k, v in kwargs.items():
+ _ = kwargs.setdefault(k, v)
+ ax.contourf(umx, **kwargs)
+ if outline:
+ ax.contour(umx, cmap='Greys_r', alpha=.7)
+ return ax
def plot_calibration(self, lables=None, ax=None, cmap='plasma', **kwargs):
@@ -83,31 +107,6 @@ def plot_qerror(self, ax=None, **kwargs):
label='Quantizationerror')
-def plot_umatrix(self, interp='None', cmap='viridis', ax=None, **kwargs):
- """Plot unified distance matrix.
-
- The unified distance matrix (udm) allows to visualize weight matrices
- of high dimensional weight vectors. The entries (x, y) of the udm
- correspondto the arithmetic mean of the distances between weight
- vector (x, y) and its 4-neighbourhood.
-
- Args:
- w: Neighbourhood width.
- interp: matplotlib interpolation method name.
- ax: Provide custom axis object.
-
- Returns:
- axis, umatrix
- """
- if ax is None:
- fig, ax = aplot._new_axis()
- udm = _som_utils.umatrix(self.weights, self.shape, metric=self.metric)
-
- ax.set_title('Unified distance matrix')
- ax.set_xlabel('# units')
- ax.set_ylabel('# units')
- ax.imshow(udm, interpolation=interp, cmap=cmap, origin='lower')
- return ax, udm
def plot_umatrix3d(self, w=1, cmap='viridis', **kwargs):
@@ -250,3 +249,96 @@ def weights_line(weights: Array, dims: Tuple, color: str = 'r',
ax.plot(wv, color=color)
return fig, axs
+
+
+def wire(ax: Axis, weights: Array, shape: Shape, *,
+ unit_size: Union[int, float, Array] = 100.0,
+ line_width: Union[int, float] = 1.0,
+ highlight: Optional[Array] = None, labels: bool = False, **kwargs):
+ """Plot the weight vectors of a SOM with two-dimensional feature space.
+
+ Neighbourhood relations are indicate by connecting lines.
+
+ Args:
+ ax: The axis subplot.
+ weights: SOM weigth matrix.
+ shape: SOM shape.
+ unit_size: Size for each unit.
+ line_width: Width of the wire lines.
+ highlight: Index of units to be marked in different color.
+ labels: If ``True``, attach a box with coordinates to each unit.
+
+ Returns:
+ vlines, hlines, bgmarker, umarker
+ """
+ unit_color = 'k'
+ bg_color = 'w'
+ hl_color = 'r'
+ alpha = .7
+
+ if isinstance(unit_size, np.ndarray):
+ marker_size = tools.scale(unit_size, 10, 110)
+ elif isinstance(unit_size, float) or isinstance(unit_size, int):
+ marker_size = np.repeat(unit_size, weights.shape[0])
+ else:
+ msg = (f'Argument of parameter ``unit_size`` must be real scalar '
+ 'or one-dimensional numpy array.')
+ raise ValueError(msg)
+ marker_size_bg = marker_size + marker_size / 100 * 30
+
+ if highlight is not None:
+ bg_color = np.where(highlight, hl_color, bg_color)
+
+ rsw = weights.reshape(*shape, 2)
+ vx, vy = rsw.T
+ hx, hy = np.rollaxis(rsw, 1).T
+ ax.set_aspect('equal')
+ vlines = ax.plot(vx, vy, unit_color, alpha=alpha, lw=line_width, zorder=9)
+ hlines = ax.plot(hx, hy, unit_color, alpha=alpha, lw=line_width, zorder=9)
+ bgmarker = ax.scatter(vx, vy, s=marker_size_bg, c=bg_color,
+ edgecolors='None', zorder=11)
+ umarker = ax.scatter(vx, vy, s=marker_size, c=unit_color, alpha=alpha,
+ edgecolors='None', zorder=12)
+
+ font = {'fontsize': 4,
+ 'va': 'bottom',
+ 'ha': 'center'}
+
+ bbox = {'alpha': 0.7,
+ 'boxstyle': 'round',
+ 'edgecolor': '#aaaaaa',
+ 'facecolor': '#dddddd',
+ 'linewidth': .5,
+ }
+
+ if labels is True:
+ for (px, py), (ix, iy) in zip(weights, np.ndindex(shape)):
+ ax.text(px+1.3, py, f'({ix}, {iy})', font, bbox=bbox, zorder=13)
+
+ return vlines, hlines, bgmarker, umarker
+
+
+def data_2d(ax: Axis, data: Array, colors: Array,
+ **kwargs) -> mpl.collections.PathCollection:
+ """Scatter plot a data set with two-dimensional feature space.
+
+ This just the usual scatter command with some reasonable defaults.
+
+ Args:
+ ax: The axis subplot.
+ data: The data set.
+ colors: Colors for each elemet in ``data``.
+
+ Returns:
+ PathCollection.
+ """
+ defaults = {
+ 'alpha': 0.2,
+ 'c': colors,
+ 'cmap': 'plasma',
+ 'edgecolors': 'None',
+ 's': 10}
+ for k, v in defaults.items():
+ _ = kwargs.setdefault(k, v)
+ aplot.outward_spines(ax)
+ return ax.scatter(*data.T, **kwargs)
diff --git a/src/apollon/som/som.py b/src/apollon/som/som.py
index f645b7670ba662480c1b9b4611dd24d9a5a90498..bf9188c588f9b29f79d44c4a9ed886af81913e98 100644
--- a/src/apollon/som/som.py
+++ b/src/apollon/som/som.py
@@ -1,186 +1,391 @@
# Licensed under the terms of the BSD-3-Clause license.
# Copyright (C) 2019 Michael Blaß
# mblass@posteo.net
+from typing import Dict, List, Optional, Tuple
-# apollon/som/som.py
-# SelfOrganizingMap module
-#
+import numpy as np
+from scipy.spatial import cKDTree
+from scipy.spatial import distance
-import itertools
-import numpy as _np
-import matplotlib.pyplot as _plt
-from scipy import stats as _stats
-from scipy.spatial import distance as _distance
-
-from apollon.io import io as _io
+from apollon.io import io as aio
from apollon.som import defaults as _defaults
from . import neighbors as _neighbors
-from . import utilities as _som_utils
-from .. types import Array as _Array
-from apollon.aplot import _new_axis, _new_axis_3d
-from apollon import aplot as aplot
-
-
-class _SomBase:
- def __init__(self, dims, n_iter, eta, nhr, nh_shape, init_distr, metric, mode, seed=None):
-
- # check dimensions
- for d in dims:
- if not isinstance(d, int) or not d >= 1:
- raise ValueError('Dimensions must be integer > 0.')
-
- self.dims = dims
- self.dx, self.dy, self.dw = self.dims
- self.shape = (self.dx, self.dy)
- self.n_units = self.dx * self.dy
- self.center = self.dx // 2, self.dy // 2
- self.whist = _np.zeros(self.n_units)
+from . import utilities as asu
+from .. types import Array, Shape, Coord
+
+
+class SomGrid:
+ def __init__(self, shape: Shape) -> None:
+
+ if not all(isinstance(val, int) and val >= 1 for val in shape):
+ raise ValueError('Dimensions must be integer > 0.')
+ self.shape = shape
+ self.pos = np.asarray(list(np.ndindex(shape)), dtype=int)
+ self.tree = cKDTree(self.pos)
+ self.rows, self.cols = np.indices(shape)
+
+ def nhb_idx(self, point: Coord, radius: float) -> Array:
+ """Compute the neighbourhood within ``radius`` around ``point``.
+
+ Args:
+ point: Coordinate in a two-dimensional array.
+ radius: Lenght of radius.
+
+ Returns:
+ Array of indices of neighbours.
+ """
+ return np.asarray(self.tree.query_ball_point(point, radius, np.inf))
+
+ def nhb(self, point: Coord, radius: float) -> Array:
+ """Compute neighbourhood within ``radius`` around ``pouint``.
+
+ Args:
+ point: Coordinate in a two-dimensional array.
+ radius: Lenght of radius.
+
+ Returns:
+ Array of positions of neighbours.
+ """
+ idx = self.nhb_idx(point, radius)
+ return self.pos[idx]
+
+ def __iter__(self):
+ for row, col in zip(self.rows.flat, self.cols.flat):
+ yield row, col
+
+ def rc(self):
+ return self.__iter__()
+
+ def cr(self):
+ for row, col in zip(self.rows.flat, self.cols.flat):
+ yield col, row
+
+
+class SomBase:
+ def __init__(self, dims: Tuple[int, int, int], n_iter: int, eta: float,
+ nhr: float, nh_shape: str, init_distr: str, metric: str,
+ seed: Optional[float] = None):
+
+ self._grid = SomGrid(dims[:2])
+ self.n_features = dims[2]
+ self._hit_counts = np.zeros(self.n_units)
self.n_iter = n_iter
- self.mode = mode
self.metric = metric
- self.isCalibrated = False
- self.calibration = None
- self.quantization_error = _np.zeros(n_iter)
+ self._qrr = np.zeros(n_iter)
+ self._trr = np.zeros(n_iter)
try:
self._neighbourhood = getattr(_neighbors, nh_shape)
except AttributeError:
- raise AttributeError(f'Neiborhood shape {nh_shape} is unknown. Use'
- 'one `gaussian`, `mexican`, `rect`, or `star`')
+ raise AttributeError(f'Neighborhood shape {nh_shape} is unknown.'
+ 'Use one `gaussian`, `mexican`, `rect`, or'
+ '`star`')
- # check training parameters
if eta is None:
self.init_eta = None
else:
- if (0 <= eta <= 1.):
+ if 0 < eta <= 1.:
self.init_eta = eta
else:
- raise ValueError('eta not in [0, 1]')
+ raise ValueError(f'Parameter ``eta``={self.init_eta} not in'
+ 'range [0, 1]')
- if nhr > 1:
+ if nhr >= 1:
self.init_nhr = nhr
- #self.final_nhr = max(self.dx, self.dy) / _defaults.nhr_scale_factor
else:
raise ValueError('Neighbourhood radius must be int > 0.')
if seed is not None:
- _np.random.seed(seed)
+ np.random.seed(seed)
if init_distr == 'uniform':
- self.weights = _np.random.uniform(0, 1, size=(self.n_units, self.dw))
+ self._weights = np.random.uniform(0, 1,
+ size=(self.n_units, self.dw))
elif init_distr == 'simplex':
- self.weights = _som_utils.init_simplex(self.dw, self.n_units)
+ self._weights = asu.init_simplex(self.dw, self.n_units)
elif init_distr == 'pca':
raise NotImplementedError
else:
raise ValueError(f'Unknown initializer "{init_distr}". Use'
'"uniform", "simplex", or "pca".')
- self._grid = _som_utils.grid(self.dx, self.dy)
+ self._dists: Optional[Array] = None
+
+ @property
+ def dims(self) -> Tuple[int, int, int]:
+ """Return the SOM dimensions."""
+ return (*self._grid.shape, self.n_features)
+
+ @property
+ def dx(self) -> int:
+ """Return the number of units along the first dimension."""
+ return self._grid.shape[0]
+
+ @property
+ def dy(self) -> int:
+ """Return the number of units along the second dimension."""
+ return self._grid.shape[1]
+
+ @property
+ def dw(self) -> int:
+ """Return the dimension of the weight vectors."""
+ return self.n_features
+
+ @property
+ def n_units(self) -> int:
+ """Return the total number of units on the SOM."""
+ return self.dx * self.dy
+
+ @property
+ def shape(self) -> Shape:
+ """Return the map shape."""
+ return self._grid.shape
+
+ @property
+ def grid(self) -> Array:
+ """Return the grid."""
+ return self._grid
+
+ @property
+ def dists(self) -> Array:
+ """Return the distance matrix of the grid points."""
+ return self._dists
+
+ @property
+ def weights(self) -> Array:
+ """Return the weight vectors."""
+ return self._weights
+
+ @property
+ def hit_counts(self) -> Array:
+ """Return total hit counts for each SOM unit."""
+ return self._hit_counts
+
+ @property
+ def quantization_error(self) -> Array:
+ """Return quantization error."""
+ return self._qrr
+
+ @property
+ def topographic_error(self) -> Array:
+ """Return topographic error."""
+ return self._trr
+
+ def calibrate(self, data: Array, target: Array) -> Array:
+ """Retrieve the target value of the best matching input data vector
+ for each unit weight vector.
+
+ Args:
+ data: Input data set.
+ target: Target labels.
+
+ Returns:
+ Array of target values.
+ """
+ bm_dv, _ = asu.best_match(data, self._weights, self.metric)
+ return target[bm_dv]
+
+ def distribute(self, data: Array) -> Dict[int, List[int]]:
+ """Distribute the vectors of ``data`` on the SOM.
+ Indices of vectors n ``data`` are mapped to the index of
+ their best matching unit.
+
+ Args:
+ data: Input data set.
+
+ Returns:
+ Dictionary with SOM unit indices as keys. Each key maps to a list
+ that holds the indices of rows in ``data``, which best match this
+ key.
+ """
+ return asu.distribute(self.match(data), self.n_units)
- def calibrate(self, data, targets):
- """Retriev for every map unit the best matching vector of the input
- data set. Save its target value at the map units position on a
- new array called `calibration`.
+ def match_flat(self, data: Array) -> Array:
+ """Return the index of the best matching unit for each vector in
+ ``data``.
Args:
- data: Input data set.
- targets: Target labels.
+ data: Input data set.
+
+ Returns:
+ Array of SOM unit indices.
"""
- bmiv, err = self.get_winners(data, argax=0)
- self._cmap = targets[bmiv]
- self.isCalibrated = True
+ bmu, _ = asu.best_match(self._weights, data, self.metric)
+ return bmu
+ def match(self, data: Array) -> Array:
+ """Return the multi_index of the best matching unit for each vector in
+ ``data``.
+ Args:
+ data: Input data set.
+
+ Returns:
+ Array of SOM unit indices.
+ """
+ bmu = self.match_flat(data)
+ pos_y, pos_x = np.unravel_index(bmu, self.shape)
+ return np.column_stack((pos_x, pos_y))
- def save(self, path):
+ def predict(self, data: Array) -> Array:
+ """Predict the SOM index of the best matching unit
+ for each item in ``data``.
+
+ Args:
+ data: Input data. Rows are items, columns are features.
+
+ Returns:
+ One-dimensional array of indices.
+ """
+ bmi, _ = asu.best_match(self.weights, data, self.metric)
+ return bmi
+
+ def save(self, path) -> None:
"""Save som object to file using pickle.
Args:
path: Save SOM to this path.
"""
- _io.save_to_pickle(self, path)
+ aio.save_to_pickle(self, path)
+ def save_weights(self, path) -> None:
+ """Save weights only.
- def transform(self, data, flat=True):
- """Transform input data to feature space.
+ Args:
+ path: File path
+ """
+ aio.save_to_npy(self._weights, path)
+
+ def transform(self, data: Array) -> Array:
+ """Transform each item in ``data`` to feature space.
+
+ This, in principle, returns best matching unit's weight vectors.
Args:
- data: 2d array of shape (N_vect, N_features).
- flat: Return flat index of True else 2d multi index.
+ data: Input data. Rows are items, columns are features.
Returns:
Position of each data item in the feature space.
"""
- bmu, err = self.get_winners(data)
+ bmi = self.predict(data)
+ return self.weights[bmi]
- if flat:
- return bmu
- else:
- midx = _np.unravel_index(bmu, (self.dx, self.dy))
- return _np.array(midx)
+ def umatrix(self, radius: int = 1, scale: bool = True, norm: bool = True):
+ """Compute U-matrix of SOM instance.
+ Args:
+ radius: Map neighbourhood radius.
+ scale: If ``True``, scale each U-height by the number of the
+ associated unit's neighbours.
+ norm: Normalize U-matrix if ``True``.
-class BatchMap(_SomBase):
+ Returns:
+ Unified distance matrix.
+ """
+ u_height = np.empty(self.n_units, dtype='float64')
+ nhd_per_unit = self._grid.nhb_idx(self._grid.pos, radius)
+ for i, nhd_idx in enumerate(nhd_per_unit):
+ cwv = self._weights[[i]]
+ nhd = self._weights[nhd_idx]
+ u_height[i] = distance.cdist(cwv, nhd, self.metric).sum()
+ if scale:
+ u_height[i] /= len(nhd_idx)
+ if norm:
+ umax = u_height.max()
+ if umax == 0:
+ u_height = np.zeros_like(u_height)
+ else:
+ u_height /= u_height.max()
+ return u_height.reshape(self.shape)
+
+
+class BatchMap(SomBase):
def __init__(self, dims: tuple, n_iter: int, eta: float, nhr: float,
nh_shape: str = 'gaussian', init_distr: str = 'uniform',
metric: str = 'euclidean', seed: int = None):
super().__init__(dims, n_iter, eta, nhr, nh_shape, init_distr, metric,
- mode='batch', seed=seed)
+ seed=seed)
-class IncrementalMap(_SomBase):
+class IncrementalMap(SomBase):
def __init__(self, dims: tuple, n_iter: int, eta: float, nhr: float,
nh_shape: str = 'gaussian', init_distr: str = 'uniform',
metric: str = 'euclidean', seed: int = None):
super().__init__(dims, n_iter, eta, nhr, nh_shape, init_distr, metric,
- mode='incremental', seed=seed)
+ seed=seed)
+ def fit(self, train_data, verbose=False, output_weights=False):
+ eta_ = asu.decrease_linear(self.init_eta, self.n_iter, _defaults.final_eta)
+ nhr_ = asu.decrease_expo(self.init_nhr, self.n_iter, _defaults.final_nhr)
- def fit(self, train_data, verbose=False):
- eta_ = _som_utils.decrease_linear(self.init_eta, self.n_iter, _defaults.final_eta)
- nhr_ = _som_utils.decrease_expo(self.init_nhr, self.n_iter, _defaults.final_nhr)
- iter_ = range(self.n_iter)
-
- for (c_iter, c_eta, c_nhr) in zip(iter_, eta_, nhr_):
+ np.random.seed(10)
+ for (c_iter, c_eta, c_nhr) in zip(range(self.n_iter), eta_, nhr_):
if verbose:
print('iter: {:2} -- eta: {:<5} -- nh: {:<6}' \
- .format(c_iter, _np.round(c_eta, 4), _np.round(c_nhr, 5)))
-
- for fvect in _np.random.permutation(train_data):
- bmu, err = _som_utils.best_match(self.weights, fvect, self.metric)
- self.whist[bmu] += 1
- self.quantization_error[c_iter] += err
+ .format(c_iter, np.round(c_eta, 4), np.round(c_nhr, 5)))
- m_idx= _np.atleast_2d(_np.unravel_index(bmu, self.shape)).T
- neighbors = self._neighbourhood(self._grid, m_idx, c_nhr)
- self.weights += c_eta * neighbors * (fvect - self.weights)
+ for i, fvect in enumerate(np.random.permutation(train_data)):
+ if output_weights:
+ fname = f'weights/weights_{c_iter:05}_{i:05}.npy'
+ with open(fname, 'wb') as fobj:
+ np.save(fobj, self._weights, allow_pickle=False)
+ bmu, err = asu.best_match(self.weights, fvect, self.metric)
+ self._hit_counts[bmu] += 1
+ m_idx = np.atleast_2d(np.unravel_index(bmu, self.shape)).T
+ neighbors = self._neighbourhood(self._grid.pos, m_idx, c_nhr)
+ self._weights += c_eta * neighbors * (fvect - self._weights)
+ _, err = asu.best_match(self.weights, train_data, self.metric)
+ self._qrr[c_iter] = err.sum() / train_data.shape[0]
-class SelfOrganizingMap(_SomBase):
+class IncrementalKDTReeMap(SomBase):
def __init__(self, dims: tuple, n_iter: int, eta: float, nhr: float,
- nh_shape: str = 'gaussian', init_distr: str = 'uniform',
- metric: str = 'euclidean', mode: str = 'incremental',
- seed: int = None):
+ nh_shape: str = 'star2', init_distr: str = 'uniform',
+ metric: str = 'euclidean', seed: int = None):
- super().__init__(dims, n_iter, eta, nhr, nh_shape, init_distr, metric, mode, seed)
+ super().__init__(dims, n_iter, eta, nhr, nh_shape, init_distr, metric,
+ seed=seed)
+ def fit(self, train_data, verbose=False):
+ """Fit SOM to input data."""
+ eta_ = asu.decrease_linear(self.init_eta, self.n_iter, _defaults.final_eta)
+ nhr_ = asu.decrease_expo(self.init_nhr, self.n_iter, _defaults.final_nhr)
+ iter_ = range(self.n_iter)
+ np.random.seed(10)
+ for (c_iter, c_eta, c_nhr) in zip(iter_, eta_, nhr_):
+ if verbose:
+ print('iter: {:2} -- eta: {:<5} -- nh: {:<6}' \
+ .format(c_iter, np.round(c_eta, 4), np.round(c_nhr, 5)))
+
+ for fvect in np.random.permutation(train_data):
+ bmu, _ = asu.best_match(self.weights, fvect, self.metric)
+ self._hit_counts[bmu] += 1
+ nh_idx = self._grid.nhb_idx(np.unravel_index(*bmu, self.shape), c_nhr)
+ #dists = _distance.cdist(self._grid.pos[nh_idx], self._grid.pos[bmu])
+ dists = np.ones(nh_idx.shape[0])
+ kern = _neighbors.gauss_kern(dists.ravel(), c_nhr) * c_eta
+ self._weights[nh_idx] += ((fvect - self._weights[nh_idx]) * kern[:, None])
+
+ _, err = asu.best_match(self.weights, train_data, self.metric)
+ self._qrr[c_iter] = err.sum() / train_data.shape[0]
+
+'''
def _batch_update(self, data_set, c_nhr):
# get bmus for vector in data_set
bm_units, total_qE = self.get_winners(data_set)
self.quantization_error.append(total_qE)
# get bmu's multi index
- bmu_midx = _np.unravel_index(bm_units, self.shape)
+ bmu_midx = np.unravel_index(bm_units, self.shape)
- w_nh = _np.zeros((self.n_units, 1))
- w_lat = _np.zeros((self.n_units, self.dw))
+ w_nh = np.zeros((self.n_units, 1))
+ w_lat = np.zeros((self.n_units, self.dw))
for bx, by, fv in zip(*bmu_midx, data_set):
# TODO: Find a way for faster nh computation
@@ -202,88 +407,10 @@ class SelfOrganizingMap(_SomBase):
# main loop
for (c_iter, c_nhr) in \
zip(range(self.n_iter),
- _som_utils.decrease_linear(self.init_nhr, self.n_iter)):
+ asu.decrease_linear(self.init_nhr, self.n_iter)):
if verbose:
print(c_iter, end=' ')
self._batch_update(data, c_nhr)
-
-
- def predict(self, data):
- """Predict a class label for each item in input data. SOM needs to be
- calibrated in order to predict class labels.
- """
- if self.isCalibrated:
- midx = self.transform(data)
- return self._cmap[midx]
- else:
- raise AttributeError('SOM is not calibrated.')
-
-
-
-class DotSom(_SomBase):
- def __init__(self, dims=(10, 10, 3), eta=.8, nh=8, n_iter=10,
- metric='euclidean', mode=None, init_distr='uniform', seed=None):
- """ This SOM assumes a stationary PoissonHMM on each unit. The weight vector
- represents the HMMs distribution parameters in the following order
- [lambda1, ..., lambda_m, gamma_11, ... gamma_mm]
-
- Args:
- dims (tuple) dx, dy, m
- """
- super().__init__(dims, eta, nh, n_iter, metric, mode, init_distr, seed)
- self._neighbourhood = self.nh_gaussian_L2
-
- def get_winners(self, data, argax=1):
- """Get the best matching neurons for every vector in data.
-
- Args:
- data: Input data set
- argax: Axis used for minimization 1=x, 0=y.
-
- Returns:
- Indices of bmus and min dists.
- """
- # TODO: if the distance between an input vector and more than one lattice
- # neuro is the same, choose winner randomly.
-
- d = _np.inner(data, self.weights)
- return _np.argmax(d), 0
-
-
-
- def fit(self, data, verbose=True):
- for (c_iter, c_eta, c_nhr) in \
- zip(range(self.n_iter),
- _som_utils.decrease_linear(self.init_eta, self.n_iter, _defaults.final_eta),
- _som_utils.decrease_expo(self.init_nhr, self.n_iter, _defaults.final_nhr)):
-
- if verbose:
- print('iter: {:2} -- eta: {:<5} -- nh: {:<6}' \
- .format(c_iter, _np.round(c_eta, 4), _np.round(c_nhr, 5)))
-
- # always shuffle data
- self._incremental_update(_np.random.permutation(data), c_eta, c_nhr)
-
-
- def _incremental_update(self, data_set, c_eta, c_nhr):
- total_qE = 0
- for fv in data_set:
- bm_units, c_qE = self.get_winners(fv)
- total_qE += c_qE
-
- # update activation map
- self.whist[bm_units] += 1
-
- # get bmu's multi index
- bmu_midx = _np.unravel_index(bm_units, self.shape)
-
- # calculate neighbourhood over bmu given current radius
- c_nh = self._neighbourhood(bmu_midx, c_nhr)
-
- # update lattice
- u = self.weights + c_eta * fv
- self.weights = u / _distance.norm(u)
-
- self.quantization_error.append(total_qE)
+ '''
diff --git a/src/apollon/som/topologies.py b/src/apollon/som/topologies.py
index 525ee480a7bd107d5fe82a171a6f6e92f3c1ce10..13c0877d45eaa49d134e8fc3ca5a153984642bca 100644
--- a/src/apollon/som/topologies.py
+++ b/src/apollon/som/topologies.py
@@ -9,32 +9,10 @@
Topologies for self-organizing maps.
Functions:
- rect_neighbourhood Return rectangular neighbourhood.
vn_neighbourhood Return 4-neighbourhood.
"""
-
-import numpy as _np
-
-
-def rect_neighbourhood(mat_shape, point, w=1):
- if point[0] - w < 0:
- rows1 = 0
- else:
- rows1 = point[0] - w
- rows2 = point[0] + w + 1
-
- if point[1] - w < 0:
- cols1 = 0
- else:
- cols1 = point[1] - w
- cols2 = point[1] + w + 1
-
- mask = _np.ones(mat_shape)
- mask[rows1:rows2, cols1:cols2] = 0
- mask[point] = 1
- out = _np.ma.masked_array(mask, mask=mask)
- return out
+import numpy as np
def vn_neighbourhood(x, y, dx, dy, flat=False):
@@ -67,7 +45,7 @@ def vn_neighbourhood(x, y, dx, dy, flat=False):
nh.append((x, y+1))
if flat:
- nh = _np.array(nh)
- return _np.ravel_multi_index(nh.T, (dx, dy))
+ nh = np.array(nh)
+ return np.ravel_multi_index(nh.T, (dx, dy))
else:
return nh
diff --git a/src/apollon/som/utilities.py b/src/apollon/som/utilities.py
index 4a5a28e136a979f657bfbe744992285b285aab63..6db07069656474f5f9cc84dda77085aa184f5708 100644
--- a/src/apollon/som/utilities.py
+++ b/src/apollon/som/utilities.py
@@ -1,27 +1,20 @@
-# Licensed under the terms of the BSD-3-Clause license.
-# Copyright (C) 2019 Michael Blaß
-# mblass@posteo.net
-
-"""apollon/som/uttilites.py
+"""apollon/som/utilites.py
Utilities for self.organizing maps.
-Functions:
- activation_map Plot activation map
- distance_map Plot a distance map
- distance_map3d Plot a 3d distance map
+Licensed under the terms of the BSD-3-Clause license.
+Copyright (C) 2019 Michael Blaß
+mblass@posteo.net
"""
import itertools
-from typing import Iterator, Tuple
+from typing import Dict, Iterable, Iterator, List, Tuple
-import matplotlib.pyplot as _plt
-from mpl_toolkits.mplot3d import Axes3D
-import numpy as _np
+import numpy as np
from scipy.spatial import distance as _distance
from scipy import stats as _stats
-import apollon.som.topologies as _topologies
-from apollon.types import Array
+from apollon.types import Array, Shape
+from apollon import tools
def grid_iter(n_rows: int, n_cols: int) -> Iterator[Tuple[int, int]]:
@@ -47,17 +40,11 @@ def grid(n_rows: int, n_cols: int) -> Array:
Returns:
Two-dimensional array in which each row represents an multi-index.
"""
- return _np.array(list(grid_iter(n_rows, n_cols)))
-
-
-def activation_map(som, **kwargs):
- ax = _plt.gca()
- am = som.activation_map.reshape(som.shape[:2])
- ax.imshow(_np.flipud(am), vmin=0, vmax=som.activation_map.max(), **kwargs)
+ return np.array(list(grid_iter(n_rows, n_cols)))
def decrease_linear(start: float, step: float, stop: float = 1.0
- ) -> Iterator[float]:
+ ) -> Iterator[float]:
"""Linearily decrease ``start`` in ``step`` steps to ``stop``."""
if step < 1 or not isinstance(step, int):
raise ValueError('Param `step` must be int >= 1.')
@@ -69,18 +56,25 @@ def decrease_linear(start: float, step: float, stop: float = 1.0
yield a * x + start
-def decrease_expo(start: float, step: float,stop: float = 1.0
- ) -> Iterator[float]:
+def decrease_expo(start: float, step: float, stop: float = 1.0
+ ) -> Iterator[float]:
"""Exponentially decrease ``start`` in ``step`` steps to ``stop``."""
if step < 1 or not isinstance(step, int):
raise ValueError('Param `step` must be int >= 1.')
elif step == 1:
yield start
else:
- b = _np.log(stop / start) / (step-1)
+ b = np.log(stop / start) / (step-1)
for x in range(step):
- yield start * _np.exp(b*x)
+ yield start * np.exp(b*x)
+"""
+def match(weights: Array, data: Array, kth, metric: str):
+ dists = _distance.cdist(weights, data, metric)
+ idx = dists.argpartition(kth, axis=0)
+ min_vals = dists[min_idx]
+ return (min_idx, min_vals)
+"""
def best_match(weights: Array, inp: Array, metric: str):
"""Compute the best matching unit of ``weights`` for each
@@ -101,78 +95,126 @@ def best_match(weights: Array, inp: Array, metric: str):
Index and error of best matching units.
"""
if weights.ndim != 2:
- raise ValueError(f'Array ``weights`` has {weights.ndim} dimensions, it'
- 'has to have exactly two dimensions.')
+ msg = (f'Array ``weights`` has {weights.ndim} dimensions, it '
+ 'has to have exactly two dimensions.')
+ raise ValueError(msg)
if weights.shape[-1] != inp.shape[-1]:
- raise ValueError(f'Feature dimension of ``weights`` has '
- '{weights.shape[0]} elemets, whereas ``inp`` has {inp.shape[-1]} '
- 'elemets. but they have, however, ' 'to match exactly.')
+ msg = (f'Feature dimension of ``weights`` has {weights.shape[0]} '
+ 'elemets, whereas ``inp`` has {inp.shape[-1]} elemets. '
+ 'However, both dimensions have to match exactly.')
+ raise ValueError(msg)
- inp = _np.atleast_2d(inp)
+ inp = np.atleast_2d(inp)
if inp.ndim > 2:
- raise ValueError(f'Array ``inp`` has {weights.ndim} dimensions, it '
- 'has to have one or two dimensions.')
+ msg = (f'Array ``inp`` has {weights.ndim} dimensions, it '
+ 'has to have one or two dimensions.')
+ raise ValueError(msg)
dists = _distance.cdist(weights, inp, metric)
return dists.argmin(axis=0), dists.min(axis=0)
-def umatrix(weights, dxy, metric='euclidean'):
- """Compute unified distance matrix.
+def sample_pca(data: Array, shape: Shape, adapt: bool = True) -> Array:
+ """Compute initial SOM weights by sampling from the first two principal
+ components of the input data.
Args:
- weights: SOM weights matrix.
- dxy:
- metric: Metric to use.
+ data: Input data set.
+ shape: Shape of SOM.
+ adapt: If ``True``, the largest value of ``shape`` is applied to the
+ principal component with the largest sigular value. This
+ orients the map, such that map dimension with the most units
+ coincides with principal component with the largest variance.
Returns:
- Unified distance matrix.
+ Array of SOM weights.
"""
- out = _np.empty(dxy, dtype='float64')
+ vals, vects, trans_data = tools.pca(data, 2)
+ data_limits = np.column_stack((trans_data.min(axis=0),
+ trans_data.max(axis=0)))
+ if adapt:
+ shape = sorted(shape, reverse=True)
+ dim_x = np.linspace(*data_limits[0], shape[0])
+ dim_y = np.linspace(*data_limits[1], shape[1])
+ grid_x, grid_y = np.meshgrid(dim_x, dim_y)
+ points = np.vstack((grid_x.ravel(), grid_y.ravel()))
+ weights = points.T @ vects + data.mean(axis=0)
+ return weights
+
+
+def sample_rnd(data: Array, shape: Shape) -> Array:
+ """Compute initial SOM weights by sampling uniformly from the data space.
- for i, mi in enumerate(_np.ndindex(dxy)):
- nh_flat_idx = _topologies.vn_neighbourhood(*mi, *dxy, flat=True)
- p = weights[i][None]
- nh = weights[nh_flat_idx]
- out[mi] = _distance.cdist(p, nh).sum() / len(nh)
+ Args:
+ data: Input data set
+ shape: Shape of SOM.
- return out / out.max()
+ Returns:
+ Array of SOM weights.
+ """
+ n_units = np.prod(shape)
+ data_limits = np.column_stack((data.max(axis=0), data.min(axis=0)))
+ weights = [np.random.uniform(*lim, n_units) for lim in data_limits]
+ return np.column_stack(weights)
-def init_simplex(n_features, n_units):
- """Initialize the weights with stochastic matrices.
+def sample_stm(data: Array, shape: Shape):
+ """Compute initial SOM weights by sampling stochastic matrices from
+ Dirichlet distribution.
The rows of each n by n stochastic matrix are sampes drawn from the
Dirichlet distribution, where n is the number of rows and cols of the
matrix. The diagonal elemets of the matrices are set to twice the
probability of the remaining elements.
- The square root n of the weight vectors' size must be element of the
- natural numbers, so that the weight vector is reshapeable to a square
- matrix.
+ The square root of the weight vectors' size must be a real integer.
Args:
- n_features: Number of features in each vector.
- n_units: Number of units on the SOM.
+ data: Input data set.
+ shape: Shape of SOM.
Returns:
- Two-dimensional array of shape (n_units, n_features), in which each
- row is a flattened random stochastic matrix.
+ Array of SOM weights.
+
+ Notes:
+ Each row of the output array is to be considered a flattened
+ stochastic matrix, such that each ``N = sqrt(data.shape[1])`` values
+ are a discrete probability distribution forming the ``N``th row of
+ the matrix.
"""
- # check for square matrix
- n_rows = _np.sqrt(n_features)
+ n_rows = np.sqrt(data.shape[1])
if bool(n_rows - int(n_rows)):
- raise ValueError(f'Weight vector (len={n_features}) is not'
- 'reshapeable to square matrix.')
- else:
- n_rows = int(n_rows)
+ msg = (f'Weight vector with {n_rows} elements is not '
+ 'reshapeable to square matrix.')
+ raise ValueError(msg)
+
+ n_rows = int(n_rows)
+ n_units = np.prod(shape)
+ alpha = np.random.randint(1, 10, (n_rows, n_rows))
+ st_matrix = np.hstack([_stats.dirichlet.rvs(alpha=a, size=n_units)
+ for a in alpha])
+ return st_matrix
- # set alpha
- alpha = _np.full((n_rows, n_rows), 500)
- _np.fill_diagonal(alpha, 1000)
- # sample from dirichlet distributions
- st_matrix = _np.hstack([_stats.dirichlet.rvs(alpha=a, size=n_units)
- for a in alpha])
- return st_matrix
+def distribute(bmu_idx: Iterable[int], n_units: int
+ ) -> Dict[int, List[int]]:
+ """List training data matches per SOM unit.
+
+ This method assumes that the ith element of ``bmu_idx`` corresponds to the
+ ith vetor in a array of input data vectors.
+
+ Empty units result in empty list.
+ Args:
+ bmu_idx: Indices of best matching units.
+ n_units: Number of units on the SOM.
+
+ Returns:
+ Dictionary in which the keys represent the flat indices of SOM units.
+ The corresponding value is a list of indices of those training data
+ vectors that have been mapped to this unit.
+ """
+ unit_matches = {i:[] for i in range(n_units)}
+ for data_idx, bmu in enumerate(bmu_idx):
+ unit_matches[bmu].append(data_idx)
+ return unit_matches
diff --git a/src/apollon/tools.py b/src/apollon/tools.py
index 529c4c2397e06b376c9771c5666cf587e1f56c73..f7cf50569b946f20d5c050bd63f4a1409b946e07 100644
--- a/src/apollon/tools.py
+++ b/src/apollon/tools.py
@@ -42,7 +42,7 @@ def pca(data: Array, n_comps: int = 2) -> Tuple[Array, Array, Array]:
``n_comps`` largest eigen vectors,
transformed input data.
"""
- data_centered = (data - data.mean(axis=0)) / data.std(axis=0)
+ data_centered = (data - data.mean(axis=0))
_, vals, vects = np.linalg.svd(data_centered)
ord_idx = np.flip(vals.argsort())[:n_comps]
diff --git a/src/apollon/types.py b/src/apollon/types.py
index f698eb7447b03a5ba292ab25c8f5aa382946330e..06d0827eb4ed03d6a30788aeca41751690b288e6 100644
--- a/src/apollon/types.py
+++ b/src/apollon/types.py
@@ -7,6 +7,8 @@ import pathlib
from typing import (Any, Collection, Dict, Generator, Iterable, List, Optional,
Sequence, Tuple, Union)
import numpy as np
+from matplotlib import axes
+
Array = np.ndarray
ArrayOrStr = Union[Array, str]
@@ -16,3 +18,9 @@ ParamsType = Dict[str, Any]
PathType = Union[str, pathlib.Path]
PathGen = Generator[PathType, None, None]
Schema = Dict[str, Collection[str]]
+
+Shape = Tuple[int, int]
+Coord = Tuple[int, int]
+AdIndex = Tuple[List[int], List[int]]
+
+Axis = axes._axes.Axes
diff --git a/tests/som/test_neighbours.py b/tests/som/test_neighbours.py
new file mode 100644
index 0000000000000000000000000000000000000000..ff645a8ce2ddab2f90b902c5f97862df6df7fe8b
--- /dev/null
+++ b/tests/som/test_neighbours.py
@@ -0,0 +1,17 @@
+import unittest
+
+from hypothesis import strategies as hst
+import numpy as np
+from numpy.spatial import distance
+import scipy as sp
+
+from apollon.som import utilities as asu
+from apollon.som.som import IncrementalMap
+
+
+class TestIsNeighbour(unittest.TestCase):
+ def setUp(self):
+ self.som = IncrementralMap((10, 10, 3), 100, 0.5, 5)
+
+
+
diff --git a/tests/som/test_som.py b/tests/som/test_som.py
index ac89f7ee55d4f7f9bcc9778cbd077e1abc239cdf..b4cd0a0ec53f66c28b4c0e14a97d54245e16b38f 100644
--- a/tests/som/test_som.py
+++ b/tests/som/test_som.py
@@ -1,31 +1,93 @@
-#!/usr/bin/python3
-
-
import unittest
+from typing import Tuple
+
+from hypothesis import given
+import hypothesis.strategies as hst
import numpy as np
import scipy as sp
-from apollon.som.som import SelfOrganizingMap
+from apollon.som.som import SomBase, SomGrid
+
+SomDim = Tuple[int, int, int]
+dimension = hst.integers(min_value=2, max_value=50)
+som_dims = hst.tuples(dimension, dimension, dimension)
+
+
+class TestSomBase(unittest.TestCase):
+ @given(som_dims)
+ def test_dims(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ self.assertEqual(som.dims, dims)
+
+ @given(som_dims)
+ def test_dx(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ self.assertEqual(som.dx, dims[0])
+
+ @given(som_dims)
+ def test_dy(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ self.assertEqual(som.dy, dims[1])
+
+ @given(som_dims)
+ def test_dw(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ self.assertEqual(som.dw, dims[2])
+
+ @given(som_dims)
+ def test_n_units(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ self.assertEqual(som.n_units, dims[0]*dims[1])
+
+ @given(som_dims)
+ def test_shape(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ self.assertEqual(som.shape, (dims[0], dims[1]))
+
+ @given(som_dims)
+ def test_grid(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ self.assertIsInstance(som.grid, SomGrid)
+
+ """
+ @given(som_dims)
+ def test_dists(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ self.assertIsInstance(som.dists, np.ndarray)
+ """
+ @given(som_dims)
+ def test_weights(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ self.assertIsInstance(som.weights, np.ndarray)
-class TestSelfOrganizingMap(unittest.TestCase):
- def setUp(self):
- N = 100
+ @given(som_dims)
+ def test_match(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ data = np.random.rand(100, dims[2])
+ self.assertIsInstance(som.match(data), np.ndarray)
- m1 = (0, 0)
- m2 = (10, 15)
- c1 = ((10, 0), (0, 10))
- c2 = ((2, 0), (0, 2))
+ @given(som_dims)
+ def test_umatrix_has_map_shape(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ um = som.umatrix()
+ self.assertEqual(um.shape, som.shape)
- seg1 = np.random.multivariate_normal(m1, c1, N)
- seg2 = np.random.multivariate_normal(m2, c2, N)
+ @given(som_dims)
+ def test_umatrix_scale(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ som._weights = np.tile(np.arange(som.n_features), (som.n_units, 1))
+ som._weights[:, -1] = np.arange(som.n_units)
+ um = som.umatrix(scale=True, norm=False)
+ self.assertEqual(um[0, 0], um[-1, -1])
+ self.assertEqual(um[0, -1], um[-1, 0])
- self.data = np.vstack((seg1, seg2))
- self.dims = (10, 10, 2)
+ @given(som_dims)
+ def test_umatrix_norm(self, dims: SomDim) -> None:
+ som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
+ um = som.umatrix(norm=True)
+ self.assertEqual(um.max(), 1.0)
- def test_init_random(self):
- som = SelfOrganizingMap(self.dims, init_distr='uniform')
- self.assertTrue('weights', True)
if __name__ == '__main__':
unittest.main()
diff --git a/tests/som/test_som_utilities.py b/tests/som/test_som_utilities.py
index 1c37fba6f5f6d1a1f1aa18449acb3fc38ed73588..9b46bcaaebc9515fc338361adfbcde8a63921b81 100644
--- a/tests/som/test_som_utilities.py
+++ b/tests/som/test_som_utilities.py
@@ -1,12 +1,45 @@
-#!/usr/bin/python3
-
import unittest
+
+from hypothesis import strategies as hst
import numpy as np
+from scipy.spatial import distance
import scipy as sp
-from apollon.som import utilities
+from apollon.som import utilities as asu
+
+"""
+class TestMatch(unittest.TestCase):
+ def setUp(self) -> None:
+ self.weights = np.random.rand(100, 5)
+ self.data = np.random.rand(200, 5)
+
+ def test_returns_tuple(self) -> None:
+ res = asu.match(self.weights, self.data, 2, 'euclidean')
+ self.assertIsInstance(res, tuple)
+
+ def test_elements_are_arrays(self) -> None:
+ bmu, err = asu.match(self.weights, self.data, 'euclidean')
+ self.assertIsInstance(bmu, np.ndarray)
+ self.assertIsInstance(err, np.ndarray)
+ def test_correct_ordering(self) -> None:
+ kth = 5
+ bmu, err = asu.match(self.weights, self.data, 'euclidean')
+ wdists = distance.cdist(self.weights, self.data)
+ kswd = wdists.sort(axis=0)[:kth, :]
+"""
+class TestDistribute(unittest.TestCase):
+ def setUp(self) -> None:
+ self.n_units = 400
+ self.bmu = np.random.randint(0, self.n_units, 100)
+
+ def returns_dict(self):
+ res = asu.distribute(self.bmu, self.n_units)
+ self.assertIsInstance(res, dict)
+
+
+"""
class TestSelfOrganizingMap(unittest.TestCase):
def setUp(self):
self.weights = np.load('data/test_weights.npy')
@@ -18,7 +51,7 @@ class TestSelfOrganizingMap(unittest.TestCase):
bmu, err = utilities.best_match(self.weights, self.inp, 'euclidean')
self.assertTrue(np.array_equiv(test_bmu, bmu))
self.assertTrue(np.array_equiv(test_err, err))
-
+"""
if __name__ == '__main__':
unittest.main()