from abc import abstractmethod, ABC
import numpy as np
from scipy.stats import norm
from sklearn.base import BaseEstimator
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from ..visualization import plot_anomaly_score, plot_roc_curve
__all__ = ['is_outlier_detector', 'BaseOutlierDetector']
[docs]def is_outlier_detector(estimator):
"""Return True if the given estimator is (probably) an outlier detector.
Parameters
----------
estimator : object
Estimator object to test.
Returns
-------
out : bool
True if estimator is an outlier detector and False otherwise.
"""
return getattr(estimator, '_estimator_type', None) == 'outlier_detector'
[docs]class BaseOutlierDetector(BaseEstimator, ABC):
"""Base class for all outlier detectors in kenchi.
References
----------
.. [#kriegel11] Kriegel, H.-P., Kroger, P., Schubert E., and Zimek, A.,
"Interpreting and unifying outlier scores,"
In Proceedings of SDM'11, pp. 13-24, 2011.
"""
_estimator_type = 'outlier_detector'
@abstractmethod
def __init__(self, contamination=0.1):
self.contamination = contamination
def _check_params(self):
"""Check validity of parameters and raise ValueError if not valid."""
if not 0. < self.contamination <= 0.5:
raise ValueError(
f'contamination must be in (0.0, 0.5] '
f'but was {self.contamination}'
)
def _check_array(self, X, n_features=None, **kwargs):
"""Check validity of the array and raise ValueError if not valid."""
X = check_array(X, **kwargs)
_, _n_features = X.shape
if n_features is not None and _n_features != n_features:
raise ValueError(
f'X is expected to have {n_features} features '
f'but had {_n_features} features'
)
return X
def _get_threshold(self):
"""Get the threshold according to the derived anomaly scores."""
return np.percentile(
self.anomaly_score_, 100. * (1. - self.contamination)
)
def _get_rv(self):
"""Get the RV object according to the derived anomaly scores."""
loc, scale = norm.fit(self.anomaly_score_)
return norm(loc=loc, scale=scale)
@abstractmethod
def _fit(self, X):
pass
@abstractmethod
def _anomaly_score(self, X):
pass
[docs] def fit_predict(self, X, y=None):
"""Fit the model according to the given training data and predict if a
particular training sample is an outlier or not.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training Data.
y : ignored
Returns
-------
y_pred : array-like of shape (n_samples,)
Return -1 for outliers and +1 for inliers.
"""
if getattr(self, 'novelty', False):
raise ValueError(
'fit_predict is not available when novelty=True, use '
'novelty=False if you want to predict on the training data'
)
return self.fit(X).predict()
[docs] def fit(self, X, y=None):
"""Fit the model according to the given training data.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : ignored
Returns
-------
self : object
Return self.
"""
self._check_params()
X = self._check_array(X, estimator=self)
_, self._n_features = X.shape
self._fit(X)
self.anomaly_score_ = self._anomaly_score(X)
self.threshold_ = self._get_threshold()
self._rv = self._get_rv()
return self
[docs] def predict(self, X=None, threshold=None):
"""Predict if a particular sample is an outlier or not.
Parameters
----------
X : array-like of shape (n_samples, n_features), default None
Data. If None, predict if a particular training sample is an
outlier or not.
threshold : float, default None
User-provided threshold.
Returns
-------
y_pred : array-like of shape (n_samples,)
Return -1 for outliers and +1 for inliers.
"""
return np.where(
self.decision_function(X, threshold=threshold) >= 0., 1, -1
)
[docs] def decision_function(self, X=None, threshold=None):
"""Compute the decision function of the given samples.
Parameters
----------
X : array-like of shape (n_samples, n_features), default None
Data. If None, compute the decision function of the given training
samples.
threshold : float, default None
User-provided threshold.
Returns
-------
y_score : array-like of shape (n_samples,)
Shifted opposite of the anomaly score for each sample. Negative
scores represent outliers and positive scores represent inliers.
"""
anomaly_score = self.anomaly_score(X)
if threshold is None:
threshold = self.threshold_
return threshold - anomaly_score
[docs] def anomaly_score(self, X=None, normalize=False):
"""Compute the anomaly score for each sample.
Parameters
----------
X : array-like of shape (n_samples, n_features), default None
Data. If None, compute the anomaly score for each training sample.
normalize : bool, default False
If True, return the normalized anomaly score.
Returns
-------
anomaly_score : array-like of shape (n_samples,)
Anomaly score for each sample.
"""
check_is_fitted(self, 'anomaly_score_')
if X is None:
anomaly_score = self.anomaly_score_
if normalize:
return np.maximum(0., 2. * self._rv.cdf(anomaly_score) - 1.)
else:
return anomaly_score
if getattr(self, 'novelty', True):
X = self._check_array(
X, n_features=self._n_features, estimator=self
)
anomaly_score = self._anomaly_score(X)
if normalize:
return np.maximum(0., 2. * self._rv.cdf(anomaly_score) - 1.)
else:
return anomaly_score
raise ValueError(
'anomaly_score is not available when novelty=False, use '
'novelty=True if you want to predict on new unseen data'
)
[docs] def plot_anomaly_score(self, X=None, normalize=False, **kwargs):
"""Plot the anomaly score for each sample.
Parameters
----------
X : array-like of shape (n_samples, n_features), default None
Data. If None, plot the anomaly score for each training samples.
normalize : bool, default False
If True, return the normalized anomaly score.
ax : matplotlib Axes, default None
Target axes instance.
bins : int, str or array-like, default 'auto'
Number of hist bins.
figsize : tuple, default None
Tuple denoting figure size of the plot.
filename : str, default None
If provided, save the current figure.
hist : bool, default True
If True, plot a histogram of anomaly scores.
kde : bool, default True
If True, plot a gaussian kernel density estimate.
title : string, default None
Axes title. To disable, pass None.
xlabel : string, default 'Samples'
X axis title label. To disable, pass None.
xlim : tuple, default None
Tuple passed to `ax.xlim`.
ylabel : string, default 'Anomaly score'
Y axis title label. To disable, pass None.
ylim : tuple, default None
Tuple passed to `ax.ylim`.
**kwargs : dict
Other keywords passed to `ax.plot`.
Returns
-------
ax : matplotlib Axes
Axes on which the plot was drawn.
"""
kwargs['anomaly_score'] = self.anomaly_score(X, normalize=normalize)
kwargs.setdefault('label', self.__class__.__name__)
if normalize:
kwargs['threshold'] = np.maximum(
0., 2. * self._rv.cdf(self.threshold_) - 1.
)
kwargs.setdefault('ylim', (0., 1.05))
else:
kwargs['threshold'] = self.threshold_
kwargs.setdefault('ylim', (0., 2. * self.threshold_))
return plot_anomaly_score(**kwargs)
[docs] def plot_roc_curve(self, X, y, **kwargs):
"""Plot the Receiver Operating Characteristic (ROC) curve.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Data.
y : array-like of shape (n_samples,)
Labels.
ax : matplotlib Axes, default None
Target axes instance.
figsize: tuple, default None
Tuple denoting figure size of the plot.
filename : str, default None
If provided, save the current figure.
title : string, default 'ROC curve'
Axes title. To disable, pass None.
xlabel : string, default 'FPR'
X axis title label. To disable, pass None.
ylabel : string, default 'TPR'
Y axis title label. To disable, pass None.
**kwargs : dict
Other keywords passed to `ax.plot`.
Returns
-------
ax : matplotlib Axes
Axes on which the plot was drawn.
"""
kwargs['y_true'] = y
kwargs['y_score'] = self.decision_function(X)
kwargs.setdefault('label', self.__class__.__name__)
return plot_roc_curve(**kwargs)