Commit 73d3c548 authored by Späth, Prof. Dr. Sebastian's avatar Späth, Prof. Dr. Sebastian
Add weighted Correlation algorithm

Fix some ambiguous Bundesland abbreviation and add an algo for
calculating weighted correlations.
from .wcorr import WeightedCorr
\ No newline at end of file
import numpy as np
import pandas as pd
from scipy.stats import rankdata
class WeightedCorr:
def __init__(self, xyw=None, x=None, y=None, w=None, df=None, wcol=None):
''' Weighted Correlation class. Either supply xyw, (x, y, w), or (df, wcol). Call the class to get the result, i.e.:
WeightedCorr(xyw=mydata[[x, y, w]])(method='pearson')
:param xyw: pd.DataFrame with shape(n, 3) containing x, y, and w columns (column names irrelevant)
:param x: pd.Series (n, ) containing values for x
:param y: pd.Series (n, ) containing values for y
:param w: pd.Series (n, ) containing weights
:param df: pd.Dataframe (n, m+1) containing m phenotypes and a weight column
:param wcol: str column of the weight column in the dataframe passed to the df argument.
if (df is None) and (wcol is None):
if np.all([i is None for i in [xyw, x, y, w]]):
raise ValueError('No data supplied')
if not ((isinstance(xyw, pd.DataFrame)) != (np.all([isinstance(i, pd.Series) for i in [x, y, w]]))):
raise TypeError('xyw should be a pd.DataFrame, or x, y, w should be pd.Series')
xyw = pd.concat([x, y, w], axis=1).dropna() if xyw is None else xyw.dropna()
self.x, self.y, self.w = (pd.to_numeric(xyw[i], errors='coerce').values for i in xyw.columns)
self.df = None
elif (wcol is not None) and (df is not None):
if (not isinstance(df, pd.DataFrame)) or (not isinstance(wcol, str)):
raise ValueError('df should be a pd.DataFrame and wcol should be a string')
if wcol not in df.columns:
raise KeyError('wcol not found in column names of df')
self.df = df.loc[:, [x for x in df.columns if x != wcol]]
self.w = pd.to_numeric(df.loc[:, wcol], errors='coerce')
raise ValueError('Incorrect arguments specified, please specify xyw, or (x, y, w) or (df, wcol)')
def _wcov(self, x, y, ms):
return np.sum(self.w * (x - ms[0]) * (y - ms[1]))
def _pearson(self, x=None, y=None):
x, y = (self.x, self.y) if ((x is None) and (y is None)) else (x, y)
mx, my = (np.sum(i * self.w) / np.sum(self.w) for i in [x, y])
return self._wcov(x, y, [mx, my]) / np.sqrt(self._wcov(x, x, [mx, mx]) * self._wcov(y, y, [my, my]))
def _wrank(self, x):
(unique, arr_inv, counts) = np.unique(rankdata(x), return_counts=True, return_inverse=True)
a = np.bincount(arr_inv, self.w)
return (np.cumsum(a) - a)[arr_inv]+((counts + 1)/2 * (a/counts))[arr_inv]
def _spearman(self, x=None, y=None):
x, y = (self.x, self.y) if ((x is None) and (y is None)) else (x, y)
return self._pearson(self._wrank(x), self._wrank(y))
def __call__(self, method='pearson'):
:param method: Correlation method to be used: 'pearson' for pearson r, 'spearman' for spearman rank-order correlation.
:return: if xyw, or (x, y, w) were passed to __init__ returns the correlation value (float).
if (df, wcol) were passed to __init__ returns a pd.DataFrame (m, m), the correlation matrix.
if method not in ['pearson', 'spearman']:
raise ValueError('method should be one of [\'pearson\', \'spearman\']')
cor = {'pearson': self._pearson, 'spearman': self._spearman}[method]
if self.df is None:
return cor()
out = pd.DataFrame(np.nan, index=self.df.columns, columns=self.df.columns)
for i, x in enumerate(self.df.columns):
for j, y in enumerate(self.df.columns):
if i >= j:
out.loc[x, y] = cor(x=pd.to_numeric(self.df[x], errors='coerce'), y=pd.to_numeric(self.df[y], errors='coerce'))
out.loc[y, x] = out.loc[x, y]
return out
