Source code for kenchi.outlier_detection.statistical

import numpy as np
from sklearn.cluster import affinity_propagation
from sklearn.covariance import GraphicalLasso
from sklearn.mixture import GaussianMixture
from sklearn.neighbors import KernelDensity
from sklearn.utils.validation import check_is_fitted

from .base import BaseOutlierDetector
from ..plotting import plot_graphical_model, plot_partial_corrcoef

__all__ = ['GMM', 'HBOS', 'KDE', 'SparseStructureLearning']


[docs]class GMM(BaseOutlierDetector): """Outlier detector using Gaussian Mixture Models (GMMs). Parameters ---------- contamination : float, default 0.1 Proportion of outliers in the data set. Used to define the threshold. covariance_type : str, default 'full' String describing the type of covariance parameters to use. Valid options are ['full'|'tied'|'diag'|'spherical']. init_params : str, default 'kmeans' Method used to initialize the weights, the means and the precisions. Valid options are ['kmeans'|'random']. max_iter : int, default 100 Maximum number of iterations. means_init : array-like of shape (n_components, n_features), default None User-provided initial means. n_init : int, default 1 Number of initializations to perform. n_components : int, default 1 Number of mixture components. precisions_init : array-like, default None User-provided initial precisions. random_state : int or RandomState instance, default None Seed of the pseudo random number generator. reg_covar : float, default 1e-06 Non-negative regularization added to the diagonal of covariance. tol : float, default 1e-03 Tolerance to declare convergence. warm_start : bool, default False If True, the solution of the last fitting is used as initialization for the next call of ``fit``. weights_init : array-like of shape (n_components,), default None User-provided initial weights. Attributes ---------- anomaly_score_ : array-like of shape (n_samples,) Anomaly score for each training data. contamination_ : float Actual proportion of outliers in the data set. threshold_ : float Threshold. Examples -------- >>> import numpy as np >>> from kenchi.outlier_detection import GMM >>> X = np.array([ ... [0., 0.], [1., 1.], [2., 0.], [3., -1.], [4., 0.], ... [5., 1.], [6., 0.], [7., -1.], [8., 0.], [1000., 1.] ... ]) >>> det = GMM(random_state=0) >>> det.fit_predict(X) array([ 1, 1, 1, 1, 1, 1, 1, 1, 1, -1]) """ @property def converged_(self): """bool: True when convergence was reached in ``fit``, False otherwise. """ return self.estimator_.converged_ @property def covariances_(self): """array-like: Covariance of each mixture component. """ return self.estimator_.covariances_ @property def lower_bound_(self): """float: Log-likelihood of the best fit of EM. """ return self.estimator_.lower_bound_ @property def means_(self): """array-like of shape (n_components, n_features): Mean of each mixture component. """ return self.estimator_.means_ @property def n_iter_(self): """int: Number of step used by the best fit of EM to reach the convergence. """ return self.estimator_.n_iter_ @property def precisions_(self): """array-like: Precision matrix for each component in the mixture. """ return self.estimator_.precisions_ @property def precisions_cholesky_(self): """array-like: Cholesky decomposition of the precision matrices of each mixture component. """ return self.estimator_.precisions_cholesky_ @property def weights_(self): """array-like of shape (n_components,): Weight of each mixture components. """ return self.estimator_.weights_ def __init__( self, contamination=0.1, covariance_type='full', init_params='kmeans', max_iter=100, means_init=None, n_components=1, n_init=1, precisions_init=None, random_state=None, reg_covar=1e-06, tol=1e-03, warm_start=False, weights_init=None ): self.contamination = contamination self.covariance_type = covariance_type self.init_params = init_params self.max_iter = max_iter self.means_init = means_init self.n_components = n_components self.n_init = n_init self.precisions_init = precisions_init self.random_state = random_state self.reg_covar = reg_covar self.tol = tol self.warm_start = warm_start self.weights_init = weights_init def _check_is_fitted(self): super()._check_is_fitted() check_is_fitted( self, [ 'converged_', 'covariances_', 'lower_bound_', 'means_', 'n_iter_', 'precisions_', 'precisions_cholesky_', 'weights_' ] ) def _fit(self, X): self.estimator_ = GaussianMixture( covariance_type = self.covariance_type, init_params = self.init_params, max_iter = self.max_iter, means_init = self.means_init, n_components = self.n_components, n_init = self.n_init, precisions_init = self.precisions_init, random_state = self.random_state, reg_covar = self.reg_covar, tol = self.tol, warm_start = self.warm_start, weights_init = self.weights_init ).fit(X) return self def _anomaly_score(self, X): return -self.estimator_.score_samples(X)
[docs]class HBOS(BaseOutlierDetector): """Histogram-based outlier detector. Parameters ---------- bins : int or str, default 'auto' Number of hist bins. contamination : float, default 0.1 Proportion of outliers in the data set. Used to define the threshold. novelty : bool, default False If True, you can use predict, decision_function and anomaly_score on new unseen data and not on the training data. Attributes ---------- anomaly_score_ : array-like of shape (n_samples,) Anomaly score for each training data. contamination_ : float Actual proportion of outliers in the data set. threshold_ : float Threshold. bin_edges_ : array-like Bin edges. data_max_ : array-like of shape (n_features,) Per feature maximum seen in the data. data_min_ : array-like of shape (n_features,) Per feature minimum seen in the data. hist_ : array-like Values of the histogram. References ---------- .. [#goldstein12] Goldstein, M., and Dengel, A., "Histogram-based outlier score (HBOS): A fast unsupervised anomaly detection algorithm," KI: Poster and Demo Track, pp. 59-63, 2012. Examples -------- >>> import numpy as np >>> from kenchi.outlier_detection import HBOS >>> X = np.array([ ... [0., 0.], [1., 1.], [2., 0.], [3., -1.], [4., 0.], ... [5., 1.], [6., 0.], [7., -1.], [8., 0.], [1000., 1.] ... ]) >>> det = HBOS() >>> det.fit_predict(X) array([ 1, 1, 1, 1, 1, 1, 1, 1, 1, -1]) """ def __init__(self, bins='auto', contamination=0.1, novelty=False): self.bins = bins self.contamination = contamination self.novelty = novelty def _check_is_fitted(self): super()._check_is_fitted() check_is_fitted(self, ['bin_edges_', 'hist_']) def _fit(self, X): _, n_features = X.shape self.data_max_ = np.max(X, axis=0) self.data_min_ = np.min(X, axis=0) self.hist_ = np.empty(n_features, dtype=object) self.bin_edges_ = np.empty(n_features, dtype=object) for j, col in enumerate(X.T): self.hist_[j], self.bin_edges_[j] = np.histogram( col, bins=self.bins, density=True ) return self def _anomaly_score(self, X): n_samples, _ = X.shape anomaly_score = np.zeros(n_samples) for j, col in enumerate(X.T): bins, = self.hist_[j].shape bin_width = self.bin_edges_[j][1] - self.bin_edges_[j][0] is_in_range = ( (self.data_min_[j] <= col) & (col <= self.data_max_[j]) ) ind = np.digitize(col, self.bin_edges_[j]) - 1 ind[is_in_range & (ind == bins)] = bins - 1 prob = np.zeros(n_samples) prob[is_in_range] = self.hist_[j][ind[is_in_range]] * bin_width with np.errstate(divide='ignore'): anomaly_score -= np.log(prob) return anomaly_score
[docs]class KDE(BaseOutlierDetector): """Outlier detector using Kernel Density Estimation (KDE). Parameters ---------- algorithm : str, default 'auto' Tree algorithm to use. Valid algorithms are ['kd_tree'|'ball_tree'|'auto']. atol : float, default 0.0 Desired absolute tolerance of the result. bandwidth : float, default 1.0 Bandwidth of the kernel. breadth_first : bool, default True If true, use a breadth-first approach to the problem. Otherwise use a depth-first approach. contamination : float, default 0.1 Proportion of outliers in the data set. Used to define the threshold. kernel : str, default 'gaussian' Kernel to use. Valid kernels are ['gaussian'|'tophat'|'epanechnikov'|'exponential'|'linear'|'cosine']. leaf_size : int, default 40 Leaf size of the underlying tree. metric : str, default 'euclidean' Distance metric to use. rtol : float, default 0.0 Desired relative tolerance of the result. metric_params : dict, default None Additional parameters to be passed to the requested metric. Attributes ---------- anomaly_score_ : array-like of shape (n_samples,) Anomaly score for each training data. contamination_ : float Actual proportion of outliers in the data set. threshold_ : float Threshold. References ---------- .. [#parzen62] Parzen, E., "On estimation of a probability density function and mode," Ann. Math. Statist., 33(3), pp. 1065-1076, 1962. Examples -------- >>> import numpy as np >>> from kenchi.outlier_detection import KDE >>> X = np.array([ ... [0., 0.], [1., 1.], [2., 0.], [3., -1.], [4., 0.], ... [5., 1.], [6., 0.], [7., -1.], [8., 0.], [1000., 1.] ... ]) >>> det = KDE() >>> det.fit_predict(X) array([ 1, 1, 1, 1, 1, 1, 1, 1, 1, -1]) """ @property def X_(self): """array-like of shape (n_samples, n_features): Training data. """ return self.estimator_.tree_.data def __init__( self, algorithm='auto', atol=0., bandwidth=1., breadth_first=True, contamination=0.1, kernel='gaussian', leaf_size=40, metric='euclidean', rtol=0., metric_params=None ): self.algorithm = algorithm self.atol = atol self.bandwidth = bandwidth self.breadth_first = breadth_first self.contamination = contamination self.kernel = kernel self.leaf_size = leaf_size self.metric = metric self.rtol = rtol self.metric_params = metric_params def _check_is_fitted(self): super()._check_is_fitted() check_is_fitted(self, 'X_') def _fit(self, X): self.estimator_ = KernelDensity( algorithm = self.algorithm, atol = self.atol, bandwidth = self.bandwidth, breadth_first = self.breadth_first, kernel = self.kernel, leaf_size = self.leaf_size, metric = self.metric, rtol = self.rtol, metric_params = self.metric_params ).fit(X) return self def _anomaly_score(self, X): return -self.estimator_.score_samples(X)
[docs]class SparseStructureLearning(BaseOutlierDetector): """Outlier detector using sparse structure learning. Parameters ---------- alpha : float, default 0.01 Regularization parameter. assume_centered : bool, default False If True, data are not centered before computation. contamination : float, default 0.1 Proportion of outliers in the data set. Used to define the threshold. enet_tol : float, default 1e-04 Tolerance for the elastic net solver used to calculate the descent direction. This parameter controls the accuracy of the search direction for a given column update, not of the overall parameter estimate. Only used for mode='cd'. max_iter : integer, default 100 Maximum number of iterations. mode : str, default 'cd' Lasso solver to use: coordinate descent or LARS. tol : float, default 1e-04 Tolerance to declare convergence. apcluster_params : dict, default None Additional parameters passed to ``sklearn.cluster.affinity_propagation``. Attributes ---------- anomaly_score_ : array-like of shape (n_samples,) Anomaly score for each training data. contamination_ : float Actual proportion of outliers in the data set. threshold_ : float Threshold. labels_ : array-like of shape (n_features,) Label of each feature. References ---------- .. [#ide09] Ide, T., Lozano, C., Abe, N., and Liu, Y., "Proximity-based anomaly detection using sparse structure learning," In Proceedings of SDM, pp. 97-108, 2009. Examples -------- >>> import numpy as np >>> from kenchi.outlier_detection import SparseStructureLearning >>> X = np.array([ ... [0., 0.], [1., 1.], [2., 0.], [3., -1.], [4., 0.], ... [5., 1.], [6., 0.], [7., -1.], [8., 0.], [1000., 1.] ... ]) >>> det = SparseStructureLearning() >>> det.fit_predict(X) array([ 1, 1, 1, 1, 1, 1, 1, 1, 1, -1]) """ @property def _apcluster_params(self): if self.apcluster_params is None: return dict() else: return self.apcluster_params @property def covariance_(self): """array-like of shape (n_features, n_features): Estimated covariance matrix. """ return self.estimator_.covariance_ @property def graphical_model_(self): """networkx Graph: GGM. """ import networkx as nx return nx.from_numpy_matrix(np.tril(self.partial_corrcoef_, k=-1)) @property def isolates_(self): """array-like of shape (n_isolates,): Indices of isolates. """ import networkx as nx return np.array(list(nx.isolates(self.graphical_model_))) @property def location_(self): """array-like of shape (n_features,): Estimated location. """ return self.estimator_.location_ @property def n_iter_(self): """int: Number of iterations run. """ return self.estimator_.n_iter_ @property def partial_corrcoef_(self): """array-like of shape (n_features, n_features): Partial correlation coefficient matrix. """ n_features, _ = self.precision_.shape diag = np.diag(self.precision_)[np.newaxis] partial_corrcoef = - self.precision_ / np.sqrt(diag.T @ diag) partial_corrcoef.flat[::n_features + 1] = 1. return partial_corrcoef @property def precision_(self): """array-like of shape (n_features, n_features): Estimated pseudo inverse matrix. """ return self.estimator_.precision_ def __init__( self, alpha=0.01, assume_centered=False, contamination=0.1, enet_tol=1e-04, max_iter=100, mode='cd', tol=1e-04, apcluster_params=None ): self.alpha = alpha self.apcluster_params = apcluster_params self.assume_centered = assume_centered self.contamination = contamination self.enet_tol = enet_tol self.max_iter = max_iter self.mode = mode self.tol = tol def _check_is_fitted(self): super()._check_is_fitted() check_is_fitted( self, [ 'covariance_', 'labels_', 'location_', 'n_iter_', 'partial_corrcoef_', 'precision_' ] ) def _fit(self, X): self.estimator_ = GraphicalLasso( alpha = self.alpha, assume_centered = self.assume_centered, enet_tol = self.enet_tol, max_iter = self.max_iter, mode = self.mode, tol = self.tol ).fit(X) _, self.labels_ = affinity_propagation( self.partial_corrcoef_, **self._apcluster_params ) return self def _anomaly_score(self, X): return self.estimator_.mahalanobis(X)
[docs] def featurewise_anomaly_score(self, X): """Compute the feature-wise anomaly scores for each sample. Parameters ---------- X : array-like of shape (n_samples, n_features) Data. Returns ------- anomaly_score : array-like of shape (n_samples, n_features) Feature-wise anomaly scores for each sample. """ self._check_is_fitted() X = self._check_array(X, estimator=self) return 0.5 * np.log( 2. * np.pi / np.diag(self.precision_) ) + 0.5 / np.diag( self.precision_ ) * ((X - self.location_) @ self.precision_) ** 2
[docs] def plot_graphical_model(self, **kwargs): """Plot the Gaussian Graphical Model (GGM). Parameters ---------- ax : matplotlib Axes, default None Target axes instance. figsize : tuple, default None Tuple denoting figure size of the plot. filename : str, default None If provided, save the current figure. random_state : int, RandomState instance, default None Seed of the pseudo random number generator. title : string, default 'GGM (n_clusters, n_features, n_isolates)' Axes title. To disable, pass None. **kwargs : dict Other keywords passed to ``nx.draw_networkx``. Returns ------- ax : matplotlib Axes Axes on which the plot was drawn. """ self._check_is_fitted() n_clusters = np.max(self.labels_) + 1 n_isolates, = self.isolates_.shape title = ( f'GGM (' f'n_clusters={n_clusters}, ' f'n_features={self.n_features_}, ' f'n_isolates={n_isolates}' f')' ) kwargs['G'] = self.graphical_model_ kwargs.setdefault('node_color', self.labels_) kwargs.setdefault('title', title) return plot_graphical_model(**kwargs)
[docs] def plot_partial_corrcoef(self, **kwargs): """Plot the partial correlation coefficient matrix. Parameters ---------- ax : matplotlib Axes, default None Target axes instance. cbar : bool, default True. If Ture, to draw a colorbar. figsize : tuple, default None Tuple denoting figure size of the plot. filename : str, default None If provided, save the current figure. title : string, default 'Partial correlation' Axes title. To disable, pass None. **kwargs : dict Other keywords passed to ``ax.pcolormesh``. Returns ------- ax : matplotlib Axes Axes on which the plot was drawn. """ self._check_is_fitted() kwargs['partial_corrcoef'] = self.partial_corrcoef_ return plot_partial_corrcoef(**kwargs)