from abc import abstractmethod, ABC
import numpy as np
from scipy.stats import norm
from sklearn.base import BaseEstimator
from sklearn.externals.joblib import dump
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from ..plotting import plot_anomaly_score, plot_roc_curve
from ..utils import check_contamination, check_novelty
__all__ = ['BaseOutlierDetector']
NEG_LABEL = -1
POS_LABEL = 1
[docs]class BaseOutlierDetector(BaseEstimator, ABC):
"""Base class for all outlier detectors in kenchi.
References
----------
.. [#kriegel11] Kriegel, H.-P., Kroger, P., Schubert, E., and Zimek, A.,
"Interpreting and unifying outlier scores,"
In Proceedings of SDM, pp. 13-24, 2011.
"""
_estimator_type = 'outlier_detector'
def _check_params(self):
"""Raise ValueError if parameters are not valid."""
if hasattr(self, 'contamination'):
check_contamination(self.contamination)
def _check_array(self, X, **kwargs):
"""Raise ValueError if the array is not valid."""
X = check_array(X, **kwargs)
_, n_features = X.shape
n_features_ = getattr(self, 'n_features_', n_features)
if n_features != n_features_:
raise ValueError(
f'X is expected to have {n_features_} features '
f'but had {n_features} features'
)
return X
def _check_is_fitted(self):
"""Raise NotFittedError if the estimator is not fitted."""
check_is_fitted(
self, [
'anomaly_score_', 'classes_', 'contamination_', 'n_features_',
'random_variable_', 'threshold_'
]
)
def _get_contamination(self):
"""Get the contamination according to the derived anomaly scores."""
if hasattr(self, 'contamination'):
return self.contamination
is_outlier = self.anomaly_score_ > self.threshold_
n_samples, = is_outlier.shape
n_outliers = np.sum(is_outlier)
return n_outliers / n_samples
def _get_threshold(self):
"""Get the threshold according to the derived anomaly scores."""
return np.percentile(
self.anomaly_score_,
100. * (1. - self.contamination),
interpolation = 'lower'
)
def _get_random_variable(self):
"""Get the RV object according to the derived anomaly scores."""
loc, scale = norm.fit(self.anomaly_score_)
return norm(loc=loc, scale=scale)
@abstractmethod
def _fit(self, X):
pass
@abstractmethod
def _anomaly_score(self, X):
pass
[docs] def fit(self, X, y=None):
"""Fit the model according to the given training data.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : ignored
Returns
-------
self : object
Return self.
"""
self._check_params()
X = self._check_array(X, estimator=self)
self._fit(X)
self.classes_ = np.array([NEG_LABEL, POS_LABEL])
_, self.n_features_ = X.shape
self.anomaly_score_ = self._anomaly_score(X)
self.threshold_ = self._get_threshold()
self.contamination_ = self._get_contamination()
self.random_variable_ = self._get_random_variable()
return self
[docs] def fit_predict(self, X, y=None):
"""Fit the model according to the given training data and predict if a
particular training sample is an outlier or not.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training Data.
y : ignored
Returns
-------
y_pred : array-like of shape (n_samples,)
Return -1 for outliers and +1 for inliers.
"""
if hasattr(self, 'novelty'):
check_novelty(self.novelty, 'fit_predict')
return self.fit(X).predict()
[docs] def predict(self, X=None, threshold=None):
"""Predict if a particular sample is an outlier or not.
Parameters
----------
X : array-like of shape (n_samples, n_features), default None
Data. If None, predict if a particular training sample is an
outlier or not.
threshold : float, default None
User-provided threshold.
Returns
-------
y_pred : array-like of shape (n_samples,)
Return -1 for outliers and +1 for inliers.
"""
return np.where(
self.decision_function(X, threshold=threshold) >= 0.,
POS_LABEL,
NEG_LABEL
)
[docs] def predict_proba(self, X=None):
"""Predict class probabilities for each sample.
Parameters
----------
X : array-like of shape (n_samples, n_features), default None
Data. If None, predict if a particular training sample is an
outlier or not.
Returns
-------
y_score : array-like of shape (n_samples, n_classes)
Class probabilities.
"""
anomaly_score = self.anomaly_score(X, normalize=True)
return np.concatenate([
anomaly_score[:, np.newaxis], 1. - anomaly_score[:, np.newaxis]
], axis=1)
[docs] def decision_function(self, X=None, threshold=None):
"""Compute the decision function of the given samples.
Parameters
----------
X : array-like of shape (n_samples, n_features), default None
Data. If None, compute the decision function of the given training
samples.
threshold : float, default None
User-provided threshold.
Returns
-------
shiftted_score_samples : array-like of shape (n_samples,)
Shifted opposite of the anomaly score for each sample. Negative
scores represent outliers and positive scores represent inliers.
"""
score_samples = self.score_samples(X)
if threshold is None:
threshold = self.threshold_
return score_samples + threshold
[docs] def score_samples(self, X=None):
"""Compute the opposite of the anomaly score for each sample.
Parameters
----------
X : array-like of shape (n_samples, n_features), default None
Data. If None, compute the opposite of the anomaly score for each
training sample.
Returns
-------
score_samples : array-like of shape (n_samples,)
Opposite of the anomaly score for each sample.
"""
return -self.anomaly_score(X)
[docs] def anomaly_score(self, X=None, normalize=False):
"""Compute the anomaly score for each sample.
Parameters
----------
X : array-like of shape (n_samples, n_features), default None
Data. If None, compute the anomaly score for each training sample.
normalize : bool, default False
If True, return the normalized anomaly score.
Returns
-------
anomaly_score : array-like of shape (n_samples,)
Anomaly score for each sample.
"""
self._check_is_fitted()
if X is None:
anomaly_score = self.anomaly_score_
if normalize:
return np.maximum(
0., 2. * self.random_variable_.cdf(anomaly_score) - 1.
)
else:
return anomaly_score
if hasattr(self, 'novelty'):
check_novelty(self.novelty, 'anomaly_score')
X = self._check_array(X, estimator=self)
anomaly_score = self._anomaly_score(X)
if normalize:
return np.maximum(
0., 2. * self.random_variable_.cdf(anomaly_score) - 1.
)
else:
return anomaly_score
[docs] def to_pickle(self, filename, **kwargs):
"""Persist an outlier detector object.
Parameters
----------
filename : str or pathlib.Path
Path of the file in which it is to be stored.
kwargs : dict
Other keywords passed to ``sklearn.externals.joblib.dump``.
Returns
-------
filenames : list
List of file names in which the data is stored.
"""
return dump(self, filename, **kwargs)
[docs] def plot_anomaly_score(self, X=None, normalize=False, **kwargs):
"""Plot the anomaly score for each sample.
Parameters
----------
X : array-like of shape (n_samples, n_features), default None
Data. If None, plot the anomaly score for each training samples.
normalize : bool, default False
If True, plot the normalized anomaly score.
ax : matplotlib Axes, default None
Target axes instance.
bins : int, str or array-like, default 'auto'
Number of hist bins.
figsize : tuple, default None
Tuple denoting figure size of the plot.
filename : str, default None
If provided, save the current figure.
hist : bool, default True
If True, plot a histogram of anomaly scores.
kde : bool, default True
If True, plot a gaussian kernel density estimate.
title : string, default None
Axes title. To disable, pass None.
xlabel : string, default 'Samples'
X axis title label. To disable, pass None.
xlim : tuple, default None
Tuple passed to ``ax.xlim``.
ylabel : string, default 'Anomaly score'
Y axis title label. To disable, pass None.
ylim : tuple, default None
Tuple passed to ``ax.ylim``.
**kwargs : dict
Other keywords passed to ``ax.plot``.
Returns
-------
ax : matplotlib Axes
Axes on which the plot was drawn.
"""
kwargs['anomaly_score'] = self.anomaly_score(X, normalize=normalize)
kwargs.setdefault('label', self.__class__.__name__)
if normalize:
kwargs.setdefault('ylim', (0., 1.05))
else:
kwargs['threshold'] = self.threshold_
kwargs.setdefault('ylim', (0., 2. * self.threshold_))
return plot_anomaly_score(**kwargs)
[docs] def plot_roc_curve(self, X, y, **kwargs):
"""Plot the Receiver Operating Characteristic (ROC) curve.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Data.
y : array-like of shape (n_samples,)
Labels.
ax : matplotlib Axes, default None
Target axes instance.
figsize: tuple, default None
Tuple denoting figure size of the plot.
filename : str, default None
If provided, save the current figure.
title : string, default 'ROC curve'
Axes title. To disable, pass None.
xlabel : string, default 'FPR'
X axis title label. To disable, pass None.
ylabel : string, default 'TPR'
Y axis title label. To disable, pass None.
**kwargs : dict
Other keywords passed to ``ax.plot``.
Returns
-------
ax : matplotlib Axes
Axes on which the plot was drawn.
"""
kwargs['y_true'] = y
kwargs['y_score'] = self.score_samples(X)
kwargs.setdefault('label', self.__class__.__name__)
return plot_roc_curve(**kwargs)