Source code for MetricsReloaded.metrics.prob_pairwise_measures

# Copyright (c) Carole Sudre
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Probabilistic Pairwise Measures - :mod:`MetricsReloaded.metrics.prob_pairwise_measures`
=======================================================================================

This module provides classes for calculating :ref:`probabilistic
<probabilistic>` pairwise measures.

.. _probabilistic:

Calculating multi-threshold/probabilistic pairwise measures
-----------------------------------------------------------

.. autoclass:: ProbabilityPairwiseMeasures
    :members:


"""


import numpy as np
from MetricsReloaded.utility.utils import (
    CacheFunctionOutput,
    max_x_at_y_more,
    max_x_at_y_less,
    min_x_at_y_more,
    min_x_at_y_less,
    trapezoidal_integration,
)


__all__ = [
    "ProbabilityPairwiseMeasures",
]


[docs]class ProbabilityPairwiseMeasures(object): def __init__( self, pred_proba, ref_proba, case=None, measures=[], empty=False, dict_args={}, ): self.measures_dict = { "sens@ppv": (self.sensitivity_at_ppv, "Sens@PPV"), "ppv@sens": (self.ppv_at_sensitivity, "PPV@Sens"), "sens@spec": (self.sensitivity_at_specificity, "Sens@Spec"), "spec@sens": (self.specificity_at_sensitivity, "Spec@Sens"), "fppi@sens": ( self.fppi_at_sensitivity, "FPPI@Sens", ), "sens@fppi": (self.sensitivity_at_fppi, "Sens@FPPI"), "auroc": (self.auroc, "AUROC"), "ap": (self.average_precision, "AP"), "froc": (self.froc, "FROC"), } self.pred = pred_proba self.ref = ref_proba self.case = case self.flag_empty = empty self.dict_args = dict_args self.measures = measures if measures is not None else self.measures_dict @CacheFunctionOutput def fp_thr(self, thresh): return np.sum(self.__fp_map_thr(thresh)) @CacheFunctionOutput def fn_thr(self, thresh): return np.sum(self.__fn_map_thr(thresh)) @CacheFunctionOutput def tp_thr(self, thresh): return np.sum(self.__tp_map_thr(thresh)) @CacheFunctionOutput def tn_thr(self, thresh): return np.sum(self.__tn_map_thr(thresh)) @CacheFunctionOutput def n_pos_ref(self): return np.sum(self.ref) @CacheFunctionOutput def n_neg_ref(self): return np.sum(1 - self.ref) @CacheFunctionOutput def all_multi_threshold_values( self, max_number_samples=150, max_number_thresh=1500 ): """ Function defining the list of values for ppv, sensitivity, specificity and FPPI according to a list of probabilistic thresholds. The thresholds are defined to obtain equal bin sizes The default maximum number of thresholds is 1500 """ unique_thresh, unique_counts = np.unique(self.pred, return_counts=True) if len(unique_thresh) < max_number_thresh: unique_new_thresh = unique_thresh elif np.size(self.ref) < max_number_samples: unique_new_thresh = unique_thresh else: numb_thresh_temp = np.size(self.ref) / max_number_samples numb_samples_temp = np.size(self.pred) / max_number_thresh unique_new_thresh = [0] current_count = 0 for (f, c) in zip(unique_thresh, unique_counts): if current_count < numb_samples_temp: current_count += c new_thresh = f else: unique_new_thresh.append(new_thresh) current_count = 0 unique_new_thresh = np.asarray(unique_new_thresh) unique_new_thresh = np.concatenate( [unique_new_thresh, np.asarray([1 + np.max(unique_thresh)])] ) list_sens = [] list_spec = [] list_ppv = [] list_fppi = [] unique_new_thresh = np.sort(unique_new_thresh)[::-1] for val in unique_new_thresh: list_sens.append(self.sensitivity_thr(val)) list_spec.append(self.specificity_thr(val)) list_ppv.append(self.positive_predictive_values_thr(val)) list_fppi.append(self.fppi_thr(val)) list_ppv[0] = 1.0 return unique_new_thresh, list_sens, list_spec, list_ppv, list_fppi def __fp_map_thr(self, thresh): """ Map of FP given a specific threshold value """ pred_bin = self.pred >= thresh return np.asarray((pred_bin - self.ref) > 0.0, dtype=np.float32) def __fn_map_thr(self, thresh): """ This function calculates the false negative map based on a threshold :return: FN map """ pred_bin = self.pred >= thresh return np.asarray((self.ref - pred_bin) > 0.0, dtype=np.float32) def __tp_map_thr(self, thresh): """ TP map given a specified threshold :return: TP map at specified threshold """ pred_bin = self.pred >= thresh return np.asarray((self.ref + pred_bin) > 1.0, dtype=np.float32) def __tn_map_thr(self, thresh): """ TN map given a specified threshold :return: TN map at specified threshold """ pred_bin = self.pred >= thresh return np.asarray((self.ref + pred_bin) < 0.5, dtype=np.float32)
[docs] def positive_predictive_values_thr(self, thresh): """ PPV given a specified threshold :return: PPV at specified threshold """ if self.flag_empty: return -1 return self.tp_thr(thresh) / (self.tp_thr(thresh) + self.fp_thr(thresh))
[docs] def specificity_thr(self, thresh): """ Specificity given a specified threshold :return: Specificity at specified threshold """ return self.tn_thr(thresh) / self.n_neg_ref()
[docs] def sensitivity_thr(self, thresh): """ Sensitivity given a specified threshold :return: Sensitivity at specified threshold """ return self.tp_thr(thresh) / self.n_pos_ref()
def fppi_thr(self, thresh): if self.case is not None: list_sum = [] for f in range(np.max(self.case)): ind_case = np.where(self.case == f)[0] case_tmp = ProbabilityPairwiseMeasures( self.pred[ind_case], self.ref[ind_case] ) list_sum.append(case_tmp.fp_thr(thresh)) fppi = np.mean(np.asarray(list_sum)) else: sum_per_image = np.sum( np.reshape(self.__fp_map_thr(thresh), [-1, self.ref.shape[-1]]), axis=0 ) fppi = np.mean(sum_per_image) return fppi
[docs] def net_benefit_treated(self): """ Calculation of net benefit given a specified threshold """ if "benefit_proba" in self.dict_args.keys(): thresh = self.dict_args["benefit_proba"] else: thresh = 0.5 tp_thresh = self.tp_thr(thresh) fp_thresh = self.fp_thr(thresh) n = np.size(np.asarray(self.pred)) return tp_thresh / n * (fp_thresh / n) * (thresh / (1 - thresh))
[docs] def auroc(self): """ Calculation of AUROC using trapezoidal integration based on the threshold and values list obtained from the all_multi_threshold_values method James A Hanley and Barbara J McNeil. 1982. The meaning and use of the area under a receiver operating characteristic (ROC) curve. Radiology 143, 1 (1982), 29–36. :return: AUC """ ( unique_thresh, list_sens, list_spec, list_ppv, list_fppi, ) = self.all_multi_threshold_values() array_spec = np.asarray(list_spec) array_sens = np.asarray(list_sens) # diff_spec = (1 - array_spec[1:]) - (1 - array_spec[:-1]) # diff_sens = array_sens[1:] - array_sens[:-1] # bottom_rect = np.sum(array_sens[:-1] * diff_spec) # top_rect = np.sum(array_sens[1:] * diff_spec) # diff_rect = np.sum(diff_sens * diff_spec) # auroc = bottom_rect + diff_rect * 0.5 auroc = trapezoidal_integration(1 - array_spec, array_sens) return auroc
[docs] def froc(self): """ Calculation of FROC score Bram Van Ginneken, Samuel G Armato III, Bartjan de Hoop, Saskia van Amelsvoort-van de Vorst, Thomas Duindam, Meindert Niemeijer, Keelin Murphy, Arnold Schilham, Alessandra Retico, Maria Evelina Fantacci, et al. 2010. Comparing and combining algorithms for computer-aided detection of pulmonary nodules in computed tomography scans: the ANODE09 study. Medical image analysis 14, 6 (2010), 707–722. """ ( unique_thresh, list_sens, list_spec, list_ppv, list_fppi, ) = self.all_multi_threshold_values() array_fppi = np.asarray(list_fppi) array_sens = np.asarray(list_sens) max_fppi = np.max(array_fppi) added_fppi = np.asarray([1.0/8, 1.0/4, 1.0/2, 1, 2, 4, 8]) added_sens = np.ones([7])*array_sens[-1] if np.max(array_fppi) > 8: ind = np.where(array_fppi>8) min_ind = np.min(ind) array_sens_new = array_sens[:ind] array_fppi_new = array_fppi[:ind] elif max_fppi < 1.0/8: array_fppi_new = np.concatenate([array_fppi, added_fppi]) array_sens_new = np.concatenate([array_sens, added_sens]) elif max_fppi == 8: array_fppi_new = array_fppi array_sens_new = array_sens else: ind = np.where(added_fppi < max_fppi) added_fppi_fin = added_fppi[ind:] added_sens_fin = added_sens[ind:] array_fppi_new = np.concatenate([array_fppi, added_fppi_fin]) array_sens_new = np.concatenate([array_sens, added_sens_fin]) # diff_fppi = array_fppi[1:] - array_fppi[:-1] # diff_sens = array_sens[1:] - array_sens[:-1] # bottom_rect = np.sum(array_sens[:-1] * diff_fppi) # top_rect = np.sum(array_sens[1:] * diff_fppi) # diff_rect = np.sum(diff_sens * diff_fppi) # froc = bottom_rect + diff_rect * 0.5 froc = trapezoidal_integration(array_fppi_new, array_sens_new) return froc
[docs] def average_precision(self): """ Average precision calculation using trapezoidal integration. This integrates the precision as function of recall curve Tsung-Yi Lin, Michael Maire, Serge Belongie, James Hays, Pietro Perona, Deva Ramanan, Piotr Dollár, and C Lawrence Zitnick. 2014. Microsoft coco: Common objects in context. In European conference on computer vision. Springer, 740–755. :return: AP """ ( unique_thresh, list_sens, list_spec, list_ppv, list_fppi, ) = self.all_multi_threshold_values() ap = trapezoidal_integration(np.asarray(list_sens), np.asarray(list_ppv)) # diff_ppv = np.asarray(list_ppv[1:]) - np.asarray(list_ppv[:-1]) # diff_sens = np.asarray(list_sens[1:]) - np.asarray(list_sens[:-1]) # bottom_rect = np.sum(np.asarray(list_ppv[:-1]) * diff_sens) # top_rect = np.sum(np.asarray(list_ppv[1:]) * diff_sens) # diff_rect = np.sum(diff_sens * diff_ppv) # ap = bottom_rect + diff_rect * 0.5 return ap
[docs] def sensitivity_at_specificity(self): """ From specificity cut-off values in the value_specificity field of the dictionary of arguments dict_args, reading of the maximum sensitivity value for all specificities larger than the specified value. If value not specified, calculated at specificity of 0.8 :return: sensitivity at specificity threshold """ if "value_specificity" in self.dict_args.keys(): value_spec = self.dict_args["value_specificity"] else: value_spec = 0.8 ( unique_thresh, list_sens, list_spec, list_ppv, list_fppi, ) = self.all_multi_threshold_values() # array_spec = np.asarray(list_spec) # ind_values = np.where(array_spec >= value_spec) # array_sens = np.asarray(list_sens) # sens_valid = array_sens[ind_values] # value_max = np.max(sens_valid) value_max = max_x_at_y_more(list_sens, list_spec, value_spec) return value_max
[docs] def specificity_at_sensitivity(self): """ Specificity given specified sensitivity (Field value_sensitivity) in the arguments dictionary. If not specified, calculated at sensitivity=0.8 :return: specificity at sensitivity threshold """ if "value_sensitivity" in self.dict_args.keys(): value_sens = self.dict_args["value_sensitivity"] else: value_sens = 0.8 ( unique_thresh, list_sens, list_spec, list_ppv, list_fppi, ) = self.all_multi_threshold_values() # array_spec = np.asarray(list_spec) # array_sens = np.asarray(list_sens) # ind_values = np.where(array_sens >= value_sens) # spec_valid = array_spec[ind_values] # value_max = np.max(spec_valid) value_max = max_x_at_y_more(list_spec, list_sens, value_sens) return value_max
[docs] def fppi_at_sensitivity(self): """ FPPI value at specified sensitivity value (Field value_sensitivity) in the arguments' dictionary. If not specified, calculated at sensitivity 0.8 :return: fppi at sensitivity threshold """ if "value_sensitivity" in self.dict_args.keys(): value_sens = self.dict_args["value_sensitivity"] else: value_sens = 0.8 ( unique_thresh, list_sens, list_spec, list_ppv, list_fppi, ) = self.all_multi_threshold_values() # array_fppi = np.asarray(list_fppi) # array_sens = np.asarray(list_sens) # ind_values = np.where(array_sens >= value_sens) # fppi_valid = array_fppi[ind_values] # value_max = np.max(fppi_valid) value_max = max_x_at_y_more(list_fppi, list_sens, value_sens) return value_max
[docs] def sensitivity_at_fppi(self): """ Sensitivity at specified value of FPPI (Field value_fppi) in the argument's dictionary. If not specified calculated at FPPI=0.8 :return: sensitivity at fppi threshold """ if "value_fppi" in self.dict_args.keys(): value_fppi = self.dict_args["value_fppi"] else: value_fppi = 0.8 ( unique_thresh, list_sens, list_spec, list_ppv, list_fppi, ) = self.all_multi_threshold_values() # array_fppi = np.asarray(list_fppi) # array_sens = np.asarray(list_sens) # ind_values = np.where(array_fppi <= value_fppi) # sens_valid = array_sens[ind_values] # value_max = np.max(sens_valid) value_max = max_x_at_y_less(list_sens, list_fppi, value_fppi) return value_max
[docs] def sensitivity_at_ppv(self): """ Sensitivity at specified PPV (field value_ppv) in the arguments' dictionary. If not specified, calculated at value 0.8 :return: sensitivity at PPV threshold """ if "value_ppv" in self.dict_args.keys(): value_ppv = self.dict_args["value_ppv"] else: value_ppv = 0.8 ( unique_thresh, list_sens, list_spec, list_ppv, list_fppi, ) = self.all_multi_threshold_values() # array_ppv = np.asarray(list_ppv) # array_sens = np.asarray(list_sens) # ind_values = np.where(array_ppv >= value_ppv) # sens_valid = array_sens[ind_values] # value_max = np.max(sens_valid) value_max = max_x_at_y_more(list_sens, list_ppv, value_ppv) return value_max
[docs] def ppv_at_sensitivity(self): """ PPV at specified sensitivity value (Field value_sensitivity) in the argument's dictionary. If not specified, calculated at value 0.8 :return: PPV at sensitivity threshold """ if "value_sensitivity" in self.dict_args.keys(): value_sens = self.dict_args["value_sensitivity"] else: value_sens = 0.8 ( unique_thresh, list_sens, list_spec, list_ppv, list_fppi, ) = self.all_multi_threshold_values() # array_ppv = np.asarray(list_ppv) # array_sens = np.asarray(list_sens) # ind_values = np.where(array_sens >= value_sens) # ppv_valid = array_ppv[ind_values] # value_max = np.max(ppv_valid) value_max = max_x_at_y_more(list_ppv, list_sens, value_sens) return value_max
[docs] def to_dict_meas(self, fmt="{:.4f}"): """ Transforming the results to form a dictionary """ result_dict = {} for key in self.measures: result = self.measures_dict[key][0]() #result_dict[key] = fmt.format(result) result_dict[key] = result return result_dict # trim the last comma