Source code for kenchi.outlier_detection.clustering_based

import numpy as np
from sklearn.cluster import MiniBatchKMeans as _MiniBatchKMeans
from sklearn.utils.validation import check_is_fitted

from .base import BaseOutlierDetector

__all__ = ['MiniBatchKMeans']


[docs]class MiniBatchKMeans(BaseOutlierDetector): """Outlier detector using K-means clustering. Parameters ---------- batch_size : int, optional, default 100 Size of the mini batches. contamination : float, default 0.1 Proportion of outliers in the data set. Used to define the threshold. init : str or array-like, default 'k-means++' Method for initialization. Valid options are ['k-means++'|'random']. init_size : int, default: 3 * batch_size Number of samples to randomly sample for speeding up the initialization. max_iter : int, default 100 Maximum number of iterations. max_no_improvement : int, default 10 Control early stopping based on the consecutive number of mini batches that does not yield an improvement on the smoothed inertia. To disable convergence detection based on inertia, set max_no_improvement to None. n_clusters : int, default 8 Number of clusters. n_init : int, default 3 Number of initializations to perform. random_state : int or RandomState instance, default None Seed of the pseudo random number generator. reassignment_ratio : float, default 0.01 Control the fraction of the maximum number of counts for a center to be reassigned. tol : float, default 0.0 Tolerance to declare convergence. Attributes ---------- anomaly_score_ : array-like of shape (n_samples,) Anomaly score for each training data. contamination_ : float Actual proportion of outliers in the data set. threshold_ : float Threshold. Examples -------- >>> import numpy as np >>> from kenchi.outlier_detection import MiniBatchKMeans >>> X = np.array([ ... [0., 0.], [1., 1.], [2., 0.], [3., -1.], [4., 0.], ... [5., 1.], [6., 0.], [7., -1.], [8., 0.], [1000., 1.] ... ]) >>> det = MiniBatchKMeans(n_clusters=1, random_state=0) >>> det.fit_predict(X) array([ 1, 1, 1, 1, 1, 1, 1, 1, 1, -1]) """ @property def cluster_centers_(self): """array-like of shape (n_clusters, n_features): Coordinates of cluster centers. """ return self.estimator_.cluster_centers_ @property def inertia_(self): """float: Value of the inertia criterion associated with the chosen partition. """ return self.estimator_.inertia_ @property def labels_(self): """array-like of shape (n_samples,): Label of each point. """ return self.estimator_.labels_ def __init__( self, batch_size=100, contamination=0.1, init='k-means++', init_size=None, max_iter=100, max_no_improvement=10, n_clusters=8, n_init=3, random_state=None, reassignment_ratio=0.01, tol=0.0 ): self.batch_size = batch_size self.contamination = contamination self.init = init self.init_size = init_size self.max_iter = max_iter self.max_no_improvement = max_no_improvement self.n_clusters = n_clusters self.n_init = n_init self.random_state = random_state self.reassignment_ratio = reassignment_ratio self.tol = tol def _check_is_fitted(self): super()._check_is_fitted() check_is_fitted(self, ['cluster_centers_', 'inertia_', 'labels_']) def _fit(self, X): self.estimator_ = _MiniBatchKMeans( batch_size = self.batch_size, init = self.init, init_size = self.init_size, max_iter = self.max_iter, max_no_improvement = self.max_no_improvement, n_clusters = self.n_clusters, n_init = self.n_init, random_state = self.random_state, reassignment_ratio = self.reassignment_ratio, tol = self.tol ).fit(X) return self def _anomaly_score(self, X): return np.min(self.estimator_.transform(X), axis=1)