Source code for kenchi.outlier_detection.clustering_based

import numpy as np
from sklearn.cluster import MiniBatchKMeans as _MiniBatchKMeans
from sklearn.utils.validation import check_is_fitted

from .base import BaseOutlierDetector

__all__ = ['MiniBatchKMeans']


[docs]class MiniBatchKMeans(BaseOutlierDetector): """Outlier detector using K-means clustering. Parameters ---------- batch_size : int, optional, default 100 Size of the mini batches. contamination : float, default 0.1 Proportion of outliers in the data set. Used to define the threshold. init : str or array-like, default 'k-means++' Method for initialization. Valid options are ['k-means++'|'random']. init_size : int, default: 3 * batch_size Number of samples to randomly sample for speeding up the initialization. max_iter : int, default 100 Maximum number of iterations. max_no_improvement : int, default 10 Control early stopping based on the consecutive number of mini batches that does not yield an improvement on the smoothed inertia. To disable convergence detection based on inertia, set max_no_improvement to None. n_clusters : int, default 8 Number of clusters. n_init : int, default 3 Number of initializations to perform. random_state : int or RandomState instance, default None Seed of the pseudo random number generator. reassignment_ratio : float, default 0.01 Control the fraction of the maximum number of counts for a center to be reassigned. tol : float, default 0.0 Tolerance to declare convergence. Attributes ---------- anomaly_score_ : array-like of shape (n_samples,) Anomaly score for each training data. threshold_ : float Threshold. cluster_centers_ : array-like of shape (n_clusters, n_features) Coordinates of cluster centers. inertia_ : float Value of the inertia criterion associated with the chosen partition. labels_ : array-like of shape (n_samples,) Label of each point. """ @property def cluster_centers_(self): return self._estimator.cluster_centers_ @property def inertia_(self): return self._estimator.inertia_ @property def labels_(self): return self._estimator.labels_ def __init__( self, batch_size=100, contamination=0.1, init='k-means++', init_size=None, max_iter=100, max_no_improvement=10, n_clusters=8, n_init=3, random_state=None, reassignment_ratio=0.01, tol=0.0 ): super().__init__(contamination=contamination) self.batch_size = batch_size self.init = init self.init_size = init_size self.max_iter = max_iter self.max_no_improvement = max_no_improvement self.n_clusters = n_clusters self.n_init = n_init self.random_state = random_state self.reassignment_ratio = reassignment_ratio self.tol = tol def _fit(self, X): self._estimator = _MiniBatchKMeans( batch_size = self.batch_size, init = self.init, init_size = self.init_size, max_iter = self.max_iter, max_no_improvement = self.max_no_improvement, n_clusters = self.n_clusters, n_init = self.n_init, random_state = self.random_state, reassignment_ratio = self.reassignment_ratio, tol = self.tol ).fit(X) return self def _anomaly_score(self, X): return np.min(self._estimator.transform(X), axis=1)
[docs] def score(self, X, y=None): """Compute the opposite value of the given data on the K-means objective. Parameters ---------- X : array-like of shape (n_samples, n_features) Data. y : ignored Returns ------- score : float Opposite value of the given data on the K-means objective. """ check_is_fitted(self, '_estimator') X = self._check_array(X, n_features=self._n_features, estimator=self) return self._estimator.score(X)