Skip to content
Snippets Groups Projects
Commit 71164a60 authored by Blaß, Michael's avatar Blaß, Michael :speech_balloon:
Browse files

Merge branch 'feat-som-speedup' into develop

New SOM functionallity and general updates.
parents 8efaf128 b7ddb3ff
No related branches found
No related tags found
No related merge requests found
# Licensed under the terms of the BSD-3-Clause license. """apollon/aplot.py
# Copyright (C) 2019 Michael Blaß
# mblass@posteo.net
""" General plotting routines.
aplot.py -- General plotting routines.
Licensed under the terms of the BSD-3-Clause license.
Copyright (C) 2019 Michael Blaß
mblass@posteo.net
Functions: Functions:
fourplot Create a four plot of time a signal. fourplot Create a four plot of time a signal.
...@@ -12,8 +13,7 @@ Functions: ...@@ -12,8 +13,7 @@ Functions:
onest_decoding Plot decoded onsets over a signal. onest_decoding Plot decoded onsets over a signal.
signal Plot a time domain signal. signal Plot a time domain signal.
""" """
from typing import Iterable, Optional, Tuple, Union
from typing import Optional, Tuple
import matplotlib.pyplot as _plt import matplotlib.pyplot as _plt
import matplotlib.cm as _cm import matplotlib.cm as _cm
...@@ -22,26 +22,29 @@ from scipy import stats as _stats ...@@ -22,26 +22,29 @@ from scipy import stats as _stats
from . import _defaults from . import _defaults
from . import tools as _tools from . import tools as _tools
from . types import Array as _Array from . types import Array as _Array, Axis
Limits = Optional[Tuple[int, int]] Limits = Optional[Tuple[int, int]]
MplFig = Optional[_plt.Figure] MplFig = Optional[_plt.Figure]
FigSize = Tuple[float, float] FigSize = Tuple[float, float]
SubplotPos = Optional[Tuple[int, int, int]] SubplotPos = Optional[Tuple[int, int, int]]
Axes = Union[Axis, Iterable[Axis]]
def _nice_spines(ax, offset: int = 10) -> None: def outward_spines(axs: Axes, offset: float = 10.0) -> None:
"""Display only left and bottom spine and displace them. """Display only left and bottom spine and displace them.
Note:
Increasing ``offset`` may breaks the layout. Since the spine is moved,
so is the axis label, which is in turn forced out of the figure's bounds.
Args: Args:
ax: Axes to be modified. axs: Axis or iterable of axes.
offset: Move the spines ``offset`` pixels in the negative direction. offset: Move the spines ``offset`` pixels in the negative direction.
Note:
Increasing ``offset`` may breaks the layout. Since the spine is moved,
so is the axis label, which is in turn forced out of the figure's
bounds.
""" """
for ax in _np.atleast_1d(axs).ravel():
ax.spines['left'].set_position(('outward', offset)) ax.spines['left'].set_position(('outward', offset))
ax.spines['bottom'].set_position(('outward', offset)) ax.spines['bottom'].set_position(('outward', offset))
ax.spines['top'].set_visible(False) ax.spines['top'].set_visible(False)
...@@ -50,6 +53,23 @@ def _nice_spines(ax, offset: int = 10) -> None: ...@@ -50,6 +53,23 @@ def _nice_spines(ax, offset: int = 10) -> None:
ax.yaxis.set_ticks_position('left') ax.yaxis.set_ticks_position('left')
def center_spines(axs: Axes,
intersect: Tuple[float, float] = (0.0, 0.0)) -> None:
"""Display axes in crosshair fashion.
Args:
axs: Axis or iterable of axes.
intersect: Coordinate of axes' intersection point.
"""
for ax in _np.atleast_1d(axs).ravel():
ax.spines['left'].set_position(('axes', intersect[0]))
ax.spines['bottom'].set_position(('axes', intersect[1]))
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.xaxis.set_ticks_position('bottom')
ax.yaxis.set_ticks_position('left')
def _new_axis(spines: str = 'nice', fig: MplFig = None, sp_pos: SubplotPos = None, def _new_axis(spines: str = 'nice', fig: MplFig = None, sp_pos: SubplotPos = None,
axison: bool = True, **kwargs) -> tuple: axison: bool = True, **kwargs) -> tuple:
"""Create a new figure with a single axis and fancy spines. """Create a new figure with a single axis and fancy spines.
......
...@@ -20,7 +20,7 @@ import warnings as _warnings ...@@ -20,7 +20,7 @@ import warnings as _warnings
import numpy as _np import numpy as _np
import chains_addiction as _ca import chainsaddiction as _ca
import apollon import apollon
from apollon import types as _at from apollon import types as _at
......
...@@ -21,7 +21,7 @@ from typing import Any, Optional ...@@ -21,7 +21,7 @@ from typing import Any, Optional
import numpy as np import numpy as np
from .. types import PathType from .. types import Array, PathType
from . json import ArrayEncoder from . json import ArrayEncoder
def generate_outpath(in_path: PathType, def generate_outpath(in_path: PathType,
...@@ -177,8 +177,36 @@ def save_to_pickle(data: Any, path: PathType) -> None: ...@@ -177,8 +177,36 @@ def save_to_pickle(data: Any, path: PathType) -> None:
Args: Args:
data: Pickleable object. data: Pickleable object.
path: Path to safe the file. path: Path to save the file.
""" """
path = pathlib.Path(path) path = pathlib.Path(path)
with path.open('wb') as file: with path.open('wb') as file:
pickle.dump(data, file) pickle.dump(data, file)
def save_to_npy(data: Array, path: PathType) -> None:
"""Save an array to numpy binary format without using pickle.
Args:
data: Numpy array.
path: Path to save the file.
"""
path = pathlib.Path(path)
with path.open('wb') as file:
np.save(file, data, allow_pickle=False)
def load_from_npy(path: PathType) -> Array:
"""Load data from numpy's binary format.
Args:
path: File path.
Returns:
Data as numpy array.
"""
path = pathlib.Path(path)
with path.open('rb') as file:
data = np.load(file, allow_pickle=False)
return data
"""apollon/som/datasets.py
Licensed under the terms of the BSD-3-Clause license.
Copyright (C) 2019 Michael Blaß
mblass@posteo.net
Function for generating test and illustration data sets.
"""
from typing import Optional, Tuple
import numpy as np
from scipy import stats
def norm_circle(n_classes: int, n_per_class: int, class_std: int,
center: Tuple[int, int] = (0, 0), radius: int = 5,
seed: Optional[int] = None):
"""Generate ``n_per_class`` samples from ``n_classes`` bivariate normal
distributions, each with standard deviation ``class_std``. The means
are equidistantly placed on a circle with radius ``radius``.
Args:
n_classes: Number of classes.
n_per_class: Number of samples in each class.
class_std: Standard deviation for every class.
center: Center of ther circle.
radius: Radius of the circle on which the means are placed.
seed: Set the random seed.
Returns:
Data set and target vector.
"""
n_samples = n_classes * n_per_class
ang = np.pi * np.linspace(0, 360, n_classes, endpoint=False) / 180
xy_pos = np.stack((np.sin(ang), np.cos(ang)), axis=1)
xy_pos *= radius + np.asarray(center)
out = np.empty((n_samples, 2))
for i, pos in enumerate(xy_pos):
idx = slice(i*n_per_class, (i+1)*n_per_class)
distr = stats.multivariate_normal(pos, np.sqrt(class_std), seed=seed)
out[idx, :] = distr.rvs(n_per_class)
return out, np.arange(n_samples) // n_per_class
...@@ -9,10 +9,17 @@ Neighborhood computations ...@@ -9,10 +9,17 @@ Neighborhood computations
Functions: Functions:
gaussian N-Dimensional Gausian neighborhood. gaussian N-Dimensional Gausian neighborhood.
""" """
from typing import List, Tuple
import numpy as np import numpy as np
from scipy.spatial import distance from scipy.spatial import distance
from .. types import Array
Shape = Tuple[int, int]
Coord = Tuple[int, int]
AdIndex = Tuple[List[int], List[int]]
def gaussian(grid, center, radius): def gaussian(grid, center, radius):
"""Compute n-dimensional Gaussian neighbourhood. """Compute n-dimensional Gaussian neighbourhood.
...@@ -62,6 +69,22 @@ def star(grid, center, radius): ...@@ -62,6 +69,22 @@ def star(grid, center, radius):
return (dists <= radius).astype(int).T return (dists <= radius).astype(int).T
def neighborhood(grid, metric='sqeuclidean'):
"""Compute n-dimensional cityblock neighborhood.
The cityblock neighborhood is a star-shaped area
around ``center``.
Params:
grid: Array of n-dimensional indices.
metric: Distance metric.
Returns:
Pairwise distances of map units.
"""
return distance.squareform(distance.pdist(grid, metric))
def rect(grid, center, radius): def rect(grid, center, radius):
"""Compute n-dimensional Chebychev neighborhood. """Compute n-dimensional Chebychev neighborhood.
...@@ -79,3 +102,57 @@ def rect(grid, center, radius): ...@@ -79,3 +102,57 @@ def rect(grid, center, radius):
center = np.atleast_2d(center) center = np.atleast_2d(center)
dists = distance.cdist(center, grid, 'chebychev') dists = distance.cdist(center, grid, 'chebychev')
return (dists <= radius).astype(int).T return (dists <= radius).astype(int).T
""" NNNNNNNNNEEEEEEEEEEWWWWWW STUFFFFFFFF """
def gauss_kern(nhb, r):
return np.exp(-nhb/(r))
def is_neighbour(cra: Array, crb: Array, grid: Array, metric: str) -> Array:
"""Compute neighbourship between each coordinate in ``units_a`` abd
``units_b`` on ``grid``.
Args:
cra: (n x 2) array of grid coordinates.
crb: (n x 2) array of grid coordinates.
grid: SOM grid array.
metric: Name of distance metric function.
Returns:
One-dimensional boolean array. ``True`` in position n means that the
points ``cra[n]`` and ``crb[n]`` are direct neighbours on ``grid``
regarding ``metric``.
"""
def check_bounds(shape: Shape, point: Coord) -> bool:
"""Return ``True`` if ``point`` is valid index in ``shape``.
Args:
shape: Shape of two-dimensional array.
point: Two-dimensional coordinate.
Return:
True if ``point`` is within ``shape`` else ``False``.
"""
return (0 <= point[0] < shape[0]) and (0 <= point[1] < shape[1])
def direct_rect_nb(shape: Shape, point: Coord) -> AdIndex:
"""Return the set of direct neighbours of ``point`` given rectangular
topology.
Args:
shape: Shape of two-dimensional array.
point: Two-dimensional coordinate.
Returns:
Advanced index of points in neighbourhood set.
"""
nhb = []
for i in range(point[0]-1, point[0]+2):
for j in range(point[1]-1, point[1]+2):
if check_bounds(shape, (i, j)):
nhb.append((i, j))
return np.asarray(nhb)
...@@ -4,13 +4,37 @@ ...@@ -4,13 +4,37 @@
"""apollon/som/plot.py """apollon/som/plot.py
""" """
from typing import Tuple from typing import Optional, Tuple, Union
import matplotlib as mpl
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from apollon import tools from apollon import tools
from apollon.types import Array from apollon import aplot
from apollon.types import Array, Axis, Shape
def umatrix(ax: Axis, umx: Array, outline: bool = False, **kwargs) -> None:
"""Plot the U-matrix.
Args:
ax: Axis subplot.
umx: U-matrix data.
Returns:
Image.
"""
defaults = {
'cmap': 'terrain',
'levels': 20}
for k, v in kwargs.items():
_ = kwargs.setdefault(k, v)
ax.contourf(umx, **kwargs)
if outline:
ax.contour(umx, cmap='Greys_r', alpha=.7)
return ax
def plot_calibration(self, lables=None, ax=None, cmap='plasma', **kwargs): def plot_calibration(self, lables=None, ax=None, cmap='plasma', **kwargs):
...@@ -83,31 +107,6 @@ def plot_qerror(self, ax=None, **kwargs): ...@@ -83,31 +107,6 @@ def plot_qerror(self, ax=None, **kwargs):
label='Quantizationerror') label='Quantizationerror')
def plot_umatrix(self, interp='None', cmap='viridis', ax=None, **kwargs):
"""Plot unified distance matrix.
The unified distance matrix (udm) allows to visualize weight matrices
of high dimensional weight vectors. The entries (x, y) of the udm
correspondto the arithmetic mean of the distances between weight
vector (x, y) and its 4-neighbourhood.
Args:
w: Neighbourhood width.
interp: matplotlib interpolation method name.
ax: Provide custom axis object.
Returns:
axis, umatrix
"""
if ax is None:
fig, ax = aplot._new_axis()
udm = _som_utils.umatrix(self.weights, self.shape, metric=self.metric)
ax.set_title('Unified distance matrix')
ax.set_xlabel('# units')
ax.set_ylabel('# units')
ax.imshow(udm, interpolation=interp, cmap=cmap, origin='lower')
return ax, udm
def plot_umatrix3d(self, w=1, cmap='viridis', **kwargs): def plot_umatrix3d(self, w=1, cmap='viridis', **kwargs):
...@@ -250,3 +249,96 @@ def weights_line(weights: Array, dims: Tuple, color: str = 'r', ...@@ -250,3 +249,96 @@ def weights_line(weights: Array, dims: Tuple, color: str = 'r',
ax.plot(wv, color=color) ax.plot(wv, color=color)
return fig, axs return fig, axs
def wire(ax: Axis, weights: Array, shape: Shape, *,
unit_size: Union[int, float, Array] = 100.0,
line_width: Union[int, float] = 1.0,
highlight: Optional[Array] = None, labels: bool = False, **kwargs):
"""Plot the weight vectors of a SOM with two-dimensional feature space.
Neighbourhood relations are indicate by connecting lines.
Args:
ax: The axis subplot.
weights: SOM weigth matrix.
shape: SOM shape.
unit_size: Size for each unit.
line_width: Width of the wire lines.
highlight: Index of units to be marked in different color.
labels: If ``True``, attach a box with coordinates to each unit.
Returns:
vlines, hlines, bgmarker, umarker
"""
unit_color = 'k'
bg_color = 'w'
hl_color = 'r'
alpha = .7
if isinstance(unit_size, np.ndarray):
marker_size = tools.scale(unit_size, 10, 110)
elif isinstance(unit_size, float) or isinstance(unit_size, int):
marker_size = np.repeat(unit_size, weights.shape[0])
else:
msg = (f'Argument of parameter ``unit_size`` must be real scalar '
'or one-dimensional numpy array.')
raise ValueError(msg)
marker_size_bg = marker_size + marker_size / 100 * 30
if highlight is not None:
bg_color = np.where(highlight, hl_color, bg_color)
rsw = weights.reshape(*shape, 2)
vx, vy = rsw.T
hx, hy = np.rollaxis(rsw, 1).T
ax.set_aspect('equal')
vlines = ax.plot(vx, vy, unit_color, alpha=alpha, lw=line_width, zorder=9)
hlines = ax.plot(hx, hy, unit_color, alpha=alpha, lw=line_width, zorder=9)
bgmarker = ax.scatter(vx, vy, s=marker_size_bg, c=bg_color,
edgecolors='None', zorder=11)
umarker = ax.scatter(vx, vy, s=marker_size, c=unit_color, alpha=alpha,
edgecolors='None', zorder=12)
font = {'fontsize': 4,
'va': 'bottom',
'ha': 'center'}
bbox = {'alpha': 0.7,
'boxstyle': 'round',
'edgecolor': '#aaaaaa',
'facecolor': '#dddddd',
'linewidth': .5,
}
if labels is True:
for (px, py), (ix, iy) in zip(weights, np.ndindex(shape)):
ax.text(px+1.3, py, f'({ix}, {iy})', font, bbox=bbox, zorder=13)
return vlines, hlines, bgmarker, umarker
def data_2d(ax: Axis, data: Array, colors: Array,
**kwargs) -> mpl.collections.PathCollection:
"""Scatter plot a data set with two-dimensional feature space.
This just the usual scatter command with some reasonable defaults.
Args:
ax: The axis subplot.
data: The data set.
colors: Colors for each elemet in ``data``.
Returns:
PathCollection.
"""
defaults = {
'alpha': 0.2,
'c': colors,
'cmap': 'plasma',
'edgecolors': 'None',
's': 10}
for k, v in defaults.items():
_ = kwargs.setdefault(k, v)
aplot.outward_spines(ax)
return ax.scatter(*data.T, **kwargs)
This diff is collapsed.
...@@ -9,32 +9,10 @@ ...@@ -9,32 +9,10 @@
Topologies for self-organizing maps. Topologies for self-organizing maps.
Functions: Functions:
rect_neighbourhood Return rectangular neighbourhood.
vn_neighbourhood Return 4-neighbourhood. vn_neighbourhood Return 4-neighbourhood.
""" """
import numpy as np
import numpy as _np
def rect_neighbourhood(mat_shape, point, w=1):
if point[0] - w < 0:
rows1 = 0
else:
rows1 = point[0] - w
rows2 = point[0] + w + 1
if point[1] - w < 0:
cols1 = 0
else:
cols1 = point[1] - w
cols2 = point[1] + w + 1
mask = _np.ones(mat_shape)
mask[rows1:rows2, cols1:cols2] = 0
mask[point] = 1
out = _np.ma.masked_array(mask, mask=mask)
return out
def vn_neighbourhood(x, y, dx, dy, flat=False): def vn_neighbourhood(x, y, dx, dy, flat=False):
...@@ -67,7 +45,7 @@ def vn_neighbourhood(x, y, dx, dy, flat=False): ...@@ -67,7 +45,7 @@ def vn_neighbourhood(x, y, dx, dy, flat=False):
nh.append((x, y+1)) nh.append((x, y+1))
if flat: if flat:
nh = _np.array(nh) nh = np.array(nh)
return _np.ravel_multi_index(nh.T, (dx, dy)) return np.ravel_multi_index(nh.T, (dx, dy))
else: else:
return nh return nh
# Licensed under the terms of the BSD-3-Clause license. """apollon/som/utilites.py
# Copyright (C) 2019 Michael Blaß
# mblass@posteo.net
"""apollon/som/uttilites.py
Utilities for self.organizing maps. Utilities for self.organizing maps.
Functions: Licensed under the terms of the BSD-3-Clause license.
activation_map Plot activation map Copyright (C) 2019 Michael Blaß
distance_map Plot a distance map mblass@posteo.net
distance_map3d Plot a 3d distance map
""" """
import itertools import itertools
from typing import Iterator, Tuple from typing import Dict, Iterable, Iterator, List, Tuple
import matplotlib.pyplot as _plt import numpy as np
from mpl_toolkits.mplot3d import Axes3D
import numpy as _np
from scipy.spatial import distance as _distance from scipy.spatial import distance as _distance
from scipy import stats as _stats from scipy import stats as _stats
import apollon.som.topologies as _topologies from apollon.types import Array, Shape
from apollon.types import Array from apollon import tools
def grid_iter(n_rows: int, n_cols: int) -> Iterator[Tuple[int, int]]: def grid_iter(n_rows: int, n_cols: int) -> Iterator[Tuple[int, int]]:
...@@ -47,13 +40,7 @@ def grid(n_rows: int, n_cols: int) -> Array: ...@@ -47,13 +40,7 @@ def grid(n_rows: int, n_cols: int) -> Array:
Returns: Returns:
Two-dimensional array in which each row represents an multi-index. Two-dimensional array in which each row represents an multi-index.
""" """
return _np.array(list(grid_iter(n_rows, n_cols))) return np.array(list(grid_iter(n_rows, n_cols)))
def activation_map(som, **kwargs):
ax = _plt.gca()
am = som.activation_map.reshape(som.shape[:2])
ax.imshow(_np.flipud(am), vmin=0, vmax=som.activation_map.max(), **kwargs)
def decrease_linear(start: float, step: float, stop: float = 1.0 def decrease_linear(start: float, step: float, stop: float = 1.0
...@@ -77,10 +64,17 @@ def decrease_expo(start: float, step: float,stop: float = 1.0 ...@@ -77,10 +64,17 @@ def decrease_expo(start: float, step: float,stop: float = 1.0
elif step == 1: elif step == 1:
yield start yield start
else: else:
b = _np.log(stop / start) / (step-1) b = np.log(stop / start) / (step-1)
for x in range(step): for x in range(step):
yield start * _np.exp(b*x) yield start * np.exp(b*x)
"""
def match(weights: Array, data: Array, kth, metric: str):
dists = _distance.cdist(weights, data, metric)
idx = dists.argpartition(kth, axis=0)
min_vals = dists[min_idx]
return (min_idx, min_vals)
"""
def best_match(weights: Array, inp: Array, metric: str): def best_match(weights: Array, inp: Array, metric: str):
"""Compute the best matching unit of ``weights`` for each """Compute the best matching unit of ``weights`` for each
...@@ -101,78 +95,126 @@ def best_match(weights: Array, inp: Array, metric: str): ...@@ -101,78 +95,126 @@ def best_match(weights: Array, inp: Array, metric: str):
Index and error of best matching units. Index and error of best matching units.
""" """
if weights.ndim != 2: if weights.ndim != 2:
raise ValueError(f'Array ``weights`` has {weights.ndim} dimensions, it' msg = (f'Array ``weights`` has {weights.ndim} dimensions, it '
'has to have exactly two dimensions.') 'has to have exactly two dimensions.')
raise ValueError(msg)
if weights.shape[-1] != inp.shape[-1]: if weights.shape[-1] != inp.shape[-1]:
raise ValueError(f'Feature dimension of ``weights`` has ' msg = (f'Feature dimension of ``weights`` has {weights.shape[0]} '
'{weights.shape[0]} elemets, whereas ``inp`` has {inp.shape[-1]} ' 'elemets, whereas ``inp`` has {inp.shape[-1]} elemets. '
'elemets. but they have, however, ' 'to match exactly.') 'However, both dimensions have to match exactly.')
raise ValueError(msg)
inp = _np.atleast_2d(inp) inp = np.atleast_2d(inp)
if inp.ndim > 2: if inp.ndim > 2:
raise ValueError(f'Array ``inp`` has {weights.ndim} dimensions, it ' msg = (f'Array ``inp`` has {weights.ndim} dimensions, it '
'has to have one or two dimensions.') 'has to have one or two dimensions.')
raise ValueError(msg)
dists = _distance.cdist(weights, inp, metric) dists = _distance.cdist(weights, inp, metric)
return dists.argmin(axis=0), dists.min(axis=0) return dists.argmin(axis=0), dists.min(axis=0)
def umatrix(weights, dxy, metric='euclidean'): def sample_pca(data: Array, shape: Shape, adapt: bool = True) -> Array:
"""Compute unified distance matrix. """Compute initial SOM weights by sampling from the first two principal
components of the input data.
Args: Args:
weights: SOM weights matrix. data: Input data set.
dxy: shape: Shape of SOM.
metric: Metric to use. adapt: If ``True``, the largest value of ``shape`` is applied to the
principal component with the largest sigular value. This
orients the map, such that map dimension with the most units
coincides with principal component with the largest variance.
Returns: Returns:
Unified distance matrix. Array of SOM weights.
""" """
out = _np.empty(dxy, dtype='float64') vals, vects, trans_data = tools.pca(data, 2)
data_limits = np.column_stack((trans_data.min(axis=0),
trans_data.max(axis=0)))
if adapt:
shape = sorted(shape, reverse=True)
dim_x = np.linspace(*data_limits[0], shape[0])
dim_y = np.linspace(*data_limits[1], shape[1])
grid_x, grid_y = np.meshgrid(dim_x, dim_y)
points = np.vstack((grid_x.ravel(), grid_y.ravel()))
weights = points.T @ vects + data.mean(axis=0)
return weights
def sample_rnd(data: Array, shape: Shape) -> Array:
"""Compute initial SOM weights by sampling uniformly from the data space.
for i, mi in enumerate(_np.ndindex(dxy)): Args:
nh_flat_idx = _topologies.vn_neighbourhood(*mi, *dxy, flat=True) data: Input data set
p = weights[i][None] shape: Shape of SOM.
nh = weights[nh_flat_idx]
out[mi] = _distance.cdist(p, nh).sum() / len(nh)
return out / out.max() Returns:
Array of SOM weights.
"""
n_units = np.prod(shape)
data_limits = np.column_stack((data.max(axis=0), data.min(axis=0)))
weights = [np.random.uniform(*lim, n_units) for lim in data_limits]
return np.column_stack(weights)
def init_simplex(n_features, n_units): def sample_stm(data: Array, shape: Shape):
"""Initialize the weights with stochastic matrices. """Compute initial SOM weights by sampling stochastic matrices from
Dirichlet distribution.
The rows of each n by n stochastic matrix are sampes drawn from the The rows of each n by n stochastic matrix are sampes drawn from the
Dirichlet distribution, where n is the number of rows and cols of the Dirichlet distribution, where n is the number of rows and cols of the
matrix. The diagonal elemets of the matrices are set to twice the matrix. The diagonal elemets of the matrices are set to twice the
probability of the remaining elements. probability of the remaining elements.
The square root n of the weight vectors' size must be element of the The square root of the weight vectors' size must be a real integer.
natural numbers, so that the weight vector is reshapeable to a square
matrix.
Args: Args:
n_features: Number of features in each vector. data: Input data set.
n_units: Number of units on the SOM. shape: Shape of SOM.
Returns: Returns:
Two-dimensional array of shape (n_units, n_features), in which each Array of SOM weights.
row is a flattened random stochastic matrix.
Notes:
Each row of the output array is to be considered a flattened
stochastic matrix, such that each ``N = sqrt(data.shape[1])`` values
are a discrete probability distribution forming the ``N``th row of
the matrix.
""" """
# check for square matrix n_rows = np.sqrt(data.shape[1])
n_rows = _np.sqrt(n_features)
if bool(n_rows - int(n_rows)): if bool(n_rows - int(n_rows)):
raise ValueError(f'Weight vector (len={n_features}) is not' msg = (f'Weight vector with {n_rows} elements is not '
'reshapeable to square matrix.') 'reshapeable to square matrix.')
else: raise ValueError(msg)
n_rows = int(n_rows)
# set alpha n_rows = int(n_rows)
alpha = _np.full((n_rows, n_rows), 500) n_units = np.prod(shape)
_np.fill_diagonal(alpha, 1000) alpha = np.random.randint(1, 10, (n_rows, n_rows))
st_matrix = np.hstack([_stats.dirichlet.rvs(alpha=a, size=n_units)
# sample from dirichlet distributions
st_matrix = _np.hstack([_stats.dirichlet.rvs(alpha=a, size=n_units)
for a in alpha]) for a in alpha])
return st_matrix return st_matrix
def distribute(bmu_idx: Iterable[int], n_units: int
) -> Dict[int, List[int]]:
"""List training data matches per SOM unit.
This method assumes that the ith element of ``bmu_idx`` corresponds to the
ith vetor in a array of input data vectors.
Empty units result in empty list.
Args:
bmu_idx: Indices of best matching units.
n_units: Number of units on the SOM.
Returns:
Dictionary in which the keys represent the flat indices of SOM units.
The corresponding value is a list of indices of those training data
vectors that have been mapped to this unit.
"""
unit_matches = {i:[] for i in range(n_units)}
for data_idx, bmu in enumerate(bmu_idx):
unit_matches[bmu].append(data_idx)
return unit_matches
...@@ -42,7 +42,7 @@ def pca(data: Array, n_comps: int = 2) -> Tuple[Array, Array, Array]: ...@@ -42,7 +42,7 @@ def pca(data: Array, n_comps: int = 2) -> Tuple[Array, Array, Array]:
``n_comps`` largest eigen vectors, ``n_comps`` largest eigen vectors,
transformed input data. transformed input data.
""" """
data_centered = (data - data.mean(axis=0)) / data.std(axis=0) data_centered = (data - data.mean(axis=0))
_, vals, vects = np.linalg.svd(data_centered) _, vals, vects = np.linalg.svd(data_centered)
ord_idx = np.flip(vals.argsort())[:n_comps] ord_idx = np.flip(vals.argsort())[:n_comps]
......
...@@ -7,6 +7,8 @@ import pathlib ...@@ -7,6 +7,8 @@ import pathlib
from typing import (Any, Collection, Dict, Generator, Iterable, List, Optional, from typing import (Any, Collection, Dict, Generator, Iterable, List, Optional,
Sequence, Tuple, Union) Sequence, Tuple, Union)
import numpy as np import numpy as np
from matplotlib import axes
Array = np.ndarray Array = np.ndarray
ArrayOrStr = Union[Array, str] ArrayOrStr = Union[Array, str]
...@@ -16,3 +18,9 @@ ParamsType = Dict[str, Any] ...@@ -16,3 +18,9 @@ ParamsType = Dict[str, Any]
PathType = Union[str, pathlib.Path] PathType = Union[str, pathlib.Path]
PathGen = Generator[PathType, None, None] PathGen = Generator[PathType, None, None]
Schema = Dict[str, Collection[str]] Schema = Dict[str, Collection[str]]
Shape = Tuple[int, int]
Coord = Tuple[int, int]
AdIndex = Tuple[List[int], List[int]]
Axis = axes._axes.Axes
import unittest
from hypothesis import strategies as hst
import numpy as np
from numpy.spatial import distance
import scipy as sp
from apollon.som import utilities as asu
from apollon.som.som import IncrementalMap
class TestIsNeighbour(unittest.TestCase):
def setUp(self):
self.som = IncrementralMap((10, 10, 3), 100, 0.5, 5)
#!/usr/bin/python3
import unittest import unittest
from typing import Tuple
from hypothesis import given
import hypothesis.strategies as hst
import numpy as np import numpy as np
import scipy as sp import scipy as sp
from apollon.som.som import SelfOrganizingMap from apollon.som.som import SomBase, SomGrid
SomDim = Tuple[int, int, int]
dimension = hst.integers(min_value=2, max_value=50)
som_dims = hst.tuples(dimension, dimension, dimension)
class TestSomBase(unittest.TestCase):
@given(som_dims)
def test_dims(self, dims: SomDim) -> None:
som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
self.assertEqual(som.dims, dims)
@given(som_dims)
def test_dx(self, dims: SomDim) -> None:
som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
self.assertEqual(som.dx, dims[0])
@given(som_dims)
def test_dy(self, dims: SomDim) -> None:
som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
self.assertEqual(som.dy, dims[1])
@given(som_dims)
def test_dw(self, dims: SomDim) -> None:
som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
self.assertEqual(som.dw, dims[2])
@given(som_dims)
def test_n_units(self, dims: SomDim) -> None:
som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
self.assertEqual(som.n_units, dims[0]*dims[1])
@given(som_dims)
def test_shape(self, dims: SomDim) -> None:
som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
self.assertEqual(som.shape, (dims[0], dims[1]))
@given(som_dims)
def test_grid(self, dims: SomDim) -> None:
som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
self.assertIsInstance(som.grid, SomGrid)
"""
@given(som_dims)
def test_dists(self, dims: SomDim) -> None:
som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
self.assertIsInstance(som.dists, np.ndarray)
"""
@given(som_dims)
def test_weights(self, dims: SomDim) -> None:
som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
self.assertIsInstance(som.weights, np.ndarray)
class TestSelfOrganizingMap(unittest.TestCase): @given(som_dims)
def setUp(self): def test_match(self, dims: SomDim) -> None:
N = 100 som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
data = np.random.rand(100, dims[2])
self.assertIsInstance(som.match(data), np.ndarray)
m1 = (0, 0) @given(som_dims)
m2 = (10, 15) def test_umatrix_has_map_shape(self, dims: SomDim) -> None:
c1 = ((10, 0), (0, 10)) som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
c2 = ((2, 0), (0, 2)) um = som.umatrix()
self.assertEqual(um.shape, som.shape)
seg1 = np.random.multivariate_normal(m1, c1, N) @given(som_dims)
seg2 = np.random.multivariate_normal(m2, c2, N) def test_umatrix_scale(self, dims: SomDim) -> None:
som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
som._weights = np.tile(np.arange(som.n_features), (som.n_units, 1))
som._weights[:, -1] = np.arange(som.n_units)
um = som.umatrix(scale=True, norm=False)
self.assertEqual(um[0, 0], um[-1, -1])
self.assertEqual(um[0, -1], um[-1, 0])
self.data = np.vstack((seg1, seg2)) @given(som_dims)
self.dims = (10, 10, 2) def test_umatrix_norm(self, dims: SomDim) -> None:
som = SomBase(dims, 100, 0.1, 10, 'gaussian', 'uniform', 'euclidean')
um = som.umatrix(norm=True)
self.assertEqual(um.max(), 1.0)
def test_init_random(self):
som = SelfOrganizingMap(self.dims, init_distr='uniform')
self.assertTrue('weights', True)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
#!/usr/bin/python3
import unittest import unittest
from hypothesis import strategies as hst
import numpy as np import numpy as np
from scipy.spatial import distance
import scipy as sp import scipy as sp
from apollon.som import utilities from apollon.som import utilities as asu
"""
class TestMatch(unittest.TestCase):
def setUp(self) -> None:
self.weights = np.random.rand(100, 5)
self.data = np.random.rand(200, 5)
def test_returns_tuple(self) -> None:
res = asu.match(self.weights, self.data, 2, 'euclidean')
self.assertIsInstance(res, tuple)
def test_elements_are_arrays(self) -> None:
bmu, err = asu.match(self.weights, self.data, 'euclidean')
self.assertIsInstance(bmu, np.ndarray)
self.assertIsInstance(err, np.ndarray)
def test_correct_ordering(self) -> None:
kth = 5
bmu, err = asu.match(self.weights, self.data, 'euclidean')
wdists = distance.cdist(self.weights, self.data)
kswd = wdists.sort(axis=0)[:kth, :]
"""
class TestDistribute(unittest.TestCase):
def setUp(self) -> None:
self.n_units = 400
self.bmu = np.random.randint(0, self.n_units, 100)
def returns_dict(self):
res = asu.distribute(self.bmu, self.n_units)
self.assertIsInstance(res, dict)
"""
class TestSelfOrganizingMap(unittest.TestCase): class TestSelfOrganizingMap(unittest.TestCase):
def setUp(self): def setUp(self):
self.weights = np.load('data/test_weights.npy') self.weights = np.load('data/test_weights.npy')
...@@ -18,7 +51,7 @@ class TestSelfOrganizingMap(unittest.TestCase): ...@@ -18,7 +51,7 @@ class TestSelfOrganizingMap(unittest.TestCase):
bmu, err = utilities.best_match(self.weights, self.inp, 'euclidean') bmu, err = utilities.best_match(self.weights, self.inp, 'euclidean')
self.assertTrue(np.array_equiv(test_bmu, bmu)) self.assertTrue(np.array_equiv(test_bmu, bmu))
self.assertTrue(np.array_equiv(test_err, err)) self.assertTrue(np.array_equiv(test_err, err))
"""
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment