Source code for kenchi.outlier_detection.distance_based

import numpy as np
from sklearn.neighbors import DistanceMetric, NearestNeighbors
from sklearn.utils import check_random_state
from sklearn.utils.validation import check_is_fitted

from .base import BaseOutlierDetector

__all__ = ['KNN', 'OneTimeSampling']


[docs]class KNN(BaseOutlierDetector): """Outlier detector using k-nearest neighbors algorithm. Parameters ---------- aggregate : bool, default False If True, return the sum of the distances from k nearest neighbors as the anomaly score. algorithm : str, default 'auto' Tree algorithm to use. Valid algorithms are ['kd_tree'|'ball_tree'|'auto']. contamination : float, default 0.1 Proportion of outliers in the data set. Used to define the threshold. leaf_size : int, default 30 Leaf size of the underlying tree. metric : str or callable, default 'minkowski' Distance metric to use. novelty : bool, default False If True, you can use predict, decision_function and anomaly_score on new unseen data and not on the training data. n_jobs : int, default 1 Number of jobs to run in parallel. If -1, then the number of jobs is set to the number of CPU cores. n_neighbors : int, default 20 Number of neighbors. p : int, default 2 Power parameter for the Minkowski metric. metric_params : dict, default None Additioal parameters passed to the requested metric. Attributes ---------- anomaly_score_ : array-like of shape (n_samples,) Anomaly score for each training data. contamination_ : float Actual proportion of outliers in the data set. threshold_ : float Threshold. n_neighbors_ : int Actual number of neighbors used for ``kneighbors`` queries. References ---------- .. [#angiulli02] Angiulli, F., and Pizzuti, C., "Fast outlier detection in high dimensional spaces," In Proceedings of PKDD, pp. 15-27, 2002. .. [#ramaswamy00] Ramaswamy, S., Rastogi, R., and Shim, K., "Efficient algorithms for mining outliers from large data sets," In Proceedings of SIGMOD, pp. 427-438, 2000. Examples -------- >>> import numpy as np >>> from kenchi.outlier_detection import KNN >>> X = np.array([ ... [0., 0.], [1., 1.], [2., 0.], [3., -1.], [4., 0.], ... [5., 1.], [6., 0.], [7., -1.], [8., 0.], [1000., 1.] ... ]) >>> det = KNN(n_neighbors=3) >>> det.fit_predict(X) array([ 1, 1, 1, 1, 1, 1, 1, 1, 1, -1]) """ @property def X_(self): """array-like of shape (n_samples, n_features): Training data. """ return self.estimator_._fit_X def __init__( self, aggregate=False, algorithm='auto', contamination=0.1, leaf_size=30, metric='minkowski', novelty=False, n_jobs=1, n_neighbors=20, p=2, metric_params=None ): self.aggregate = aggregate self.algorithm = algorithm self.contamination = contamination self.leaf_size = leaf_size self.metric = metric self.novelty = novelty self.n_jobs = n_jobs self.n_neighbors = n_neighbors self.p = p self.metric_params = metric_params def _check_is_fitted(self): super()._check_is_fitted() check_is_fitted(self, ['n_neighbors_', 'X_']) def _fit(self, X): n_samples, _ = X.shape self.n_neighbors_ = np.maximum( 1, np.minimum(self.n_neighbors, n_samples - 1) ) self.estimator_ = NearestNeighbors( algorithm = self.algorithm, leaf_size = self.leaf_size, metric = self.metric, n_jobs = self.n_jobs, n_neighbors = self.n_neighbors_, p = self.p, metric_params = self.metric_params ).fit(X) return self def _anomaly_score(self, X): if X is self.X_: dist, _ = self.estimator_.kneighbors() else: dist, _ = self.estimator_.kneighbors(X) if self.aggregate: return np.sum(dist, axis=1) else: return np.max(dist, axis=1)
[docs]class OneTimeSampling(BaseOutlierDetector): """One-time sampling. Parameters ---------- contamination : float, default 0.1 Proportion of outliers in the data set. Used to define the threshold. metric : str, default 'euclidean' Distance metric to use. novelty : bool, default False If True, you can use predict, decision_function and anomaly_score on new unseen data and not on the training data. n_subsamples : int, default 20 Number of random samples to be used. random_state : int, RandomState instance, default None Seed of the pseudo random number generator. metric_params : dict, default None Additional parameters passed to the requested metric. Attributes ---------- anomaly_score_ : array-like of shape (n_samples,) Anomaly score for each training data. contamination_ : float Actual proportion of outliers in the data set. threshold_ : float Threshold. subsamples_ : array-like of shape (n_subsamples,) Indices of subsamples. S_ : array-like of shape (n_subsamples, n_features) Subset of the given training data. References ---------- .. [#sugiyama13] Sugiyama, M., and Borgwardt, K., "Rapid distance-based outlier detection via sampling," Advances in NIPS, pp. 467-475, 2013. Examples -------- >>> import numpy as np >>> from kenchi.outlier_detection import OneTimeSampling >>> X = np.array([ ... [0., 0.], [1., 1.], [2., 0.], [3., -1.], [4., 0.], ... [5., 1.], [6., 0.], [7., -1.], [8., 0.], [1000., 1.] ... ]) >>> det = OneTimeSampling(n_subsamples=3, random_state=0) >>> det.fit_predict(X) array([ 1, 1, 1, 1, 1, 1, 1, 1, 1, -1]) """ @property def _metric_params(self): if self.metric_params is None: return dict() else: return self.metric_params def __init__( self, contamination=0.1, metric='euclidean', novelty=False, n_subsamples=20, random_state=None, metric_params=None ): self.contamination = contamination self.metric = metric self.novelty = novelty self.n_subsamples = n_subsamples self.random_state = random_state self.metric_params = metric_params def _check_params(self): super()._check_params() if self.n_subsamples <= 0: raise ValueError( f'n_subsamples must be positive but was {self.n_subsamples}' ) def _check_array(self, X, **kwargs): X = super()._check_array(X, **kwargs) n_samples, _ = X.shape if self.n_subsamples >= n_samples: raise ValueError( f'n_subsamples must be smaller than {n_samples} ' f'but was {self.n_subsamples}' ) return X def _check_is_fitted(self): super()._check_is_fitted() check_is_fitted(self, ['subsamples_', 'S_']) def _fit(self, X): n_samples, _ = X.shape rnd = check_random_state(self.random_state) subsamples = rnd.choice( n_samples, size=self.n_subsamples, replace=False ) # sort again as choice does not guarantee sorted order self.subsamples_ = np.sort(subsamples) self.S_ = X[self.subsamples_] self.metric_ = DistanceMetric.get_metric( self.metric, **self._metric_params ) return self def _anomaly_score(self, X): return np.min(self.metric_.pairwise(X, self.S_), axis=1)