import numpy as np
from sklearn.neighbors import DistanceMetric, NearestNeighbors
from sklearn.utils import check_random_state
from sklearn.utils.validation import check_is_fitted
from .base import BaseOutlierDetector
__all__ = ['KNN', 'OneTimeSampling']
[docs]class KNN(BaseOutlierDetector):
"""Outlier detector using k-nearest neighbors algorithm.
Parameters
----------
aggregate : bool, default False
If True, return the sum of the distances from k nearest neighbors as
the anomaly score.
algorithm : str, default 'auto'
Tree algorithm to use. Valid algorithms are
['kd_tree'|'ball_tree'|'auto'].
contamination : float, default 0.1
Proportion of outliers in the data set. Used to define the threshold.
leaf_size : int, default 30
Leaf size of the underlying tree.
metric : str or callable, default 'minkowski'
Distance metric to use.
novelty : bool, default False
If True, you can use predict, decision_function and anomaly_score on
new unseen data and not on the training data.
n_jobs : int, default 1
Number of jobs to run in parallel. If -1, then the number of jobs is
set to the number of CPU cores.
n_neighbors : int, default 20
Number of neighbors.
p : int, default 2
Power parameter for the Minkowski metric.
metric_params : dict, default None
Additioal parameters passed to the requested metric.
Attributes
----------
anomaly_score_ : array-like of shape (n_samples,)
Anomaly score for each training data.
contamination_ : float
Actual proportion of outliers in the data set.
threshold_ : float
Threshold.
n_neighbors_ : int
Actual number of neighbors used for ``kneighbors`` queries.
References
----------
.. [#angiulli02] Angiulli, F., and Pizzuti, C.,
"Fast outlier detection in high dimensional spaces,"
In Proceedings of PKDD, pp. 15-27, 2002.
.. [#ramaswamy00] Ramaswamy, S., Rastogi, R., and Shim, K.,
"Efficient algorithms for mining outliers from large data sets,"
In Proceedings of SIGMOD, pp. 427-438, 2000.
Examples
--------
>>> import numpy as np
>>> from kenchi.outlier_detection import KNN
>>> X = np.array([
... [0., 0.], [1., 1.], [2., 0.], [3., -1.], [4., 0.],
... [5., 1.], [6., 0.], [7., -1.], [8., 0.], [1000., 1.]
... ])
>>> det = KNN(n_neighbors=3)
>>> det.fit_predict(X)
array([ 1, 1, 1, 1, 1, 1, 1, 1, 1, -1])
"""
@property
def X_(self):
"""array-like of shape (n_samples, n_features): Training data.
"""
return self.estimator_._fit_X
def __init__(
self, aggregate=False, algorithm='auto', contamination=0.1,
leaf_size=30, metric='minkowski', novelty=False, n_jobs=1,
n_neighbors=20, p=2, metric_params=None
):
self.aggregate = aggregate
self.algorithm = algorithm
self.contamination = contamination
self.leaf_size = leaf_size
self.metric = metric
self.novelty = novelty
self.n_jobs = n_jobs
self.n_neighbors = n_neighbors
self.p = p
self.metric_params = metric_params
def _check_is_fitted(self):
super()._check_is_fitted()
check_is_fitted(self, ['n_neighbors_', 'X_'])
def _fit(self, X):
n_samples, _ = X.shape
self.n_neighbors_ = np.maximum(
1, np.minimum(self.n_neighbors, n_samples - 1)
)
self.estimator_ = NearestNeighbors(
algorithm = self.algorithm,
leaf_size = self.leaf_size,
metric = self.metric,
n_jobs = self.n_jobs,
n_neighbors = self.n_neighbors_,
p = self.p,
metric_params = self.metric_params
).fit(X)
return self
def _anomaly_score(self, X):
if X is self.X_:
dist, _ = self.estimator_.kneighbors()
else:
dist, _ = self.estimator_.kneighbors(X)
if self.aggregate:
return np.sum(dist, axis=1)
else:
return np.max(dist, axis=1)
[docs]class OneTimeSampling(BaseOutlierDetector):
"""One-time sampling.
Parameters
----------
contamination : float, default 0.1
Proportion of outliers in the data set. Used to define the threshold.
metric : str, default 'euclidean'
Distance metric to use.
novelty : bool, default False
If True, you can use predict, decision_function and anomaly_score on
new unseen data and not on the training data.
n_subsamples : int, default 20
Number of random samples to be used.
random_state : int, RandomState instance, default None
Seed of the pseudo random number generator.
metric_params : dict, default None
Additional parameters passed to the requested metric.
Attributes
----------
anomaly_score_ : array-like of shape (n_samples,)
Anomaly score for each training data.
contamination_ : float
Actual proportion of outliers in the data set.
threshold_ : float
Threshold.
subsamples_ : array-like of shape (n_subsamples,)
Indices of subsamples.
S_ : array-like of shape (n_subsamples, n_features)
Subset of the given training data.
References
----------
.. [#sugiyama13] Sugiyama, M., and Borgwardt, K.,
"Rapid distance-based outlier detection via sampling,"
Advances in NIPS, pp. 467-475, 2013.
Examples
--------
>>> import numpy as np
>>> from kenchi.outlier_detection import OneTimeSampling
>>> X = np.array([
... [0., 0.], [1., 1.], [2., 0.], [3., -1.], [4., 0.],
... [5., 1.], [6., 0.], [7., -1.], [8., 0.], [1000., 1.]
... ])
>>> det = OneTimeSampling(n_subsamples=3, random_state=0)
>>> det.fit_predict(X)
array([ 1, 1, 1, 1, 1, 1, 1, 1, 1, -1])
"""
@property
def _metric_params(self):
if self.metric_params is None:
return dict()
else:
return self.metric_params
def __init__(
self, contamination=0.1, metric='euclidean', novelty=False,
n_subsamples=20, random_state=None, metric_params=None
):
self.contamination = contamination
self.metric = metric
self.novelty = novelty
self.n_subsamples = n_subsamples
self.random_state = random_state
self.metric_params = metric_params
def _check_params(self):
super()._check_params()
if self.n_subsamples <= 0:
raise ValueError(
f'n_subsamples must be positive but was {self.n_subsamples}'
)
def _check_array(self, X, **kwargs):
X = super()._check_array(X, **kwargs)
n_samples, _ = X.shape
if self.n_subsamples >= n_samples:
raise ValueError(
f'n_subsamples must be smaller than {n_samples} '
f'but was {self.n_subsamples}'
)
return X
def _check_is_fitted(self):
super()._check_is_fitted()
check_is_fitted(self, ['subsamples_', 'S_'])
def _fit(self, X):
n_samples, _ = X.shape
rnd = check_random_state(self.random_state)
subsamples = rnd.choice(
n_samples, size=self.n_subsamples, replace=False
)
# sort again as choice does not guarantee sorted order
self.subsamples_ = np.sort(subsamples)
self.S_ = X[self.subsamples_]
self.metric_ = DistanceMetric.get_metric(
self.metric, **self._metric_params
)
return self
def _anomaly_score(self, X):
return np.min(self.metric_.pairwise(X, self.S_), axis=1)