#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import itertools
from collections import defaultdict
from json import dumps as json_dumps
from typing import (
Any,
DefaultDict,
Dict,
List,
NamedTuple,
Optional,
Sequence,
Tuple,
Union,
)
import numpy as np
from pytext.common.constants import SpecialTokens
from pytext.utils import cuda
from pytext.utils.ascii_table import ascii_table
NAN_LABELS = [SpecialTokens.UNK, SpecialTokens.PAD]
RECALL_AT_PRECISION_THRESHOLDS = [0.2, 0.4, 0.6, 0.8, 0.9]
PRECISION_AT_RECALL_THRESHOLDS = [0.2, 0.4, 0.6, 0.8, 0.9]
"""
Basic metric classes and functions for single-label prediction problems.
Extending to multi-label support
"""
[docs]class LabelPrediction(NamedTuple):
"""
Label predictions of an example.
Attributes:
label_scores: Confidence scores that each label receives.
predicted_label: Index of the predicted label. This is usually the label with
the highest confidence score in label_scores.
expected_label: Index of the true label.
"""
label_scores: List[float]
predicted_label: int
expected_label: int
[docs]class LabelListPrediction(NamedTuple):
"""
Label list predictions of an example.
Attributes:
label_scores: Confidence scores that each label receives.
predicted_label: List of indices of the predicted label.
expected_label: List of indices of the true label.
"""
label_scores: List[float]
predicted_label: List[int]
expected_label: List[int]
[docs]class PRF1Scores(NamedTuple):
"""
Precision/recall/F1 scores for a collection of predictions.
Attributes:
true_positives: Number of true positives.
false_positives: Number of false positives.
false_negatives: Number of false negatives.
precision: TP / (TP + FP).
recall: TP / (TP + FN).
f1: 2 * TP / (2 * TP + FP + FN).
"""
true_positives: int
false_positives: int
false_negatives: int
precision: float
recall: float
f1: float
[docs]class SoftClassificationMetrics(NamedTuple):
"""
Classification scores that are independent of thresholds.
"""
average_precision: float
recall_at_precision: Dict[float, float]
decision_thresh_at_precision: Dict[float, float]
precision_at_recall: Dict[float, float]
decision_thresh_at_recall: Dict[float, float]
roc_auc: Optional[float]
[docs]class MultiLabelSoftClassificationMetrics(NamedTuple):
"""
Classification scores that are independent of thresholds.
"""
average_label_precision: Dict[str, float]
average_overall_precision: float
average_label_recall: Dict[str, float]
average_overall_recall: float
recall_at_precision: Dict[str, Dict[str, Dict[float, float]]]
decision_thresh_at_precision: Dict[str, Dict[str, Dict[float, float]]]
precision_at_recall: Dict[str, Dict[str, Dict[float, float]]]
decision_thresh_at_recall: Dict[str, Dict[str, Dict[float, float]]]
roc_auc: Optional[Dict[Optional[str], Optional[Dict[str, Optional[float]]]]]
average_overall_auc: float
label_accuracy: Dict[str, float]
average_overall_accuracy: float
[docs]class MacroPRF1Scores(NamedTuple):
"""
Macro precision/recall/F1 scores (averages across each label).
Attributes:
num_label: Number of distinct labels.
precision: Equally weighted average of precisions for each label.
recall: Equally weighted average of recalls for each label.
f1: Equally weighted average of F1 scores for each label.
"""
num_labels: int
precision: float
recall: float
f1: float
[docs]class MacroPRF1Metrics(NamedTuple):
"""
Aggregated metric class for macro precision/recall/F1 scores.
Attributes:
per_label_scores: Mapping from label string to the corresponding
precision/recall/F1 scores.
macro_scores: Macro precision/recall/F1 scores across the labels in
`per_label_scores`.
"""
per_label_scores: Dict[str, PRF1Scores]
macro_scores: MacroPRF1Scores
[docs] def print_metrics(self, indentation="") -> None:
print(
ascii_table(
[
{
"label": label,
"precision": f"{metrics.precision:.2f}",
"recall": f"{metrics.recall:.2f}",
"f1": f"{metrics.f1:.2f}",
"support": metrics.true_positives + metrics.false_negatives,
}
for label, metrics in sorted(self.per_label_scores.items())
],
human_column_names={
"label": "Label",
"precision": "Precision",
"recall": "Recall",
"f1": "F1",
"support": "Support",
},
footer={
"label": "Overall macro scores",
"precision": f"{self.macro_scores.precision:.2f}",
"recall": f"{self.macro_scores.recall:.2f}",
"f1": f"{self.macro_scores.f1:.2f}",
},
alignments={"label": "<"},
indentation=indentation,
)
)
[docs]class PRF1Metrics(NamedTuple):
"""
Metric class for all types of precision/recall/F1 scores.
Attributes:
per_label_scores: Map from label string to the corresponding precision/recall/F1
scores.
macro_scores: Macro precision/recall/F1 scores across the labels in
`per_label_scores`.
micro_scores: Micro (regular) precision/recall/F1 scores for the same
collection of predictions.
"""
per_label_scores: Dict[str, PRF1Scores]
macro_scores: MacroPRF1Scores
micro_scores: PRF1Scores
[docs] def print_metrics(self) -> None:
res = (
f"\t{'Per label scores':<40}"
f"\t{'Precision':<10}"
f"\t{'Recall':<10}"
f"\t{'F1':<10}"
f"\t{'Support':<10}\n\n"
)
for label, label_metrics in self.per_label_scores.items():
support = label_metrics.true_positives + label_metrics.false_negatives
res += (
f"\t{label:<40}"
f"\t{label_metrics.precision * 100:<10.3f}"
f"\t{label_metrics.recall * 100:<10.3f}"
f"\t{label_metrics.f1 * 100:<10.3f}"
f"\t{support:<10}\n"
)
support = self.micro_scores.true_positives + self.micro_scores.false_negatives
res += (
f"\n\t{'Overall micro scores':<40}"
f"\t{self.micro_scores.precision * 100:<10.3f}"
f"\t{self.micro_scores.recall * 100:<10.3f}"
f"\t{self.micro_scores.f1 * 100:<10.3f}"
f"\t{support:<10}\n"
)
res += (
f"\t{'Overall macro scores':<40}"
f"\t{self.macro_scores.precision * 100:<10.3f}"
f"\t{self.macro_scores.recall * 100:<10.3f}"
f"\t{self.macro_scores.f1 * 100:<10.3f}\n"
)
print(res)
[docs]class ClassificationMetrics(NamedTuple):
"""
Metric class for various classification metrics.
Attributes:
accuracy: Overall accuracy of predictions.
macro_prf1_metrics: Macro precision/recall/F1 scores.
per_label_soft_scores: Per label soft metrics.
mcc: Matthews correlation coefficient.
roc_auc: Area under the Receiver Operating Characteristic curve.
loss: Training loss (only used for selecting best model, no need to print).
"""
accuracy: float
macro_prf1_metrics: MacroPRF1Metrics
per_label_soft_scores: Optional[Dict[str, SoftClassificationMetrics]]
mcc: Optional[float]
roc_auc: Optional[float]
loss: float
[docs] def print_metrics(self, report_pep=False) -> None:
print(f"Accuracy: {self.accuracy * 100:.2f}")
print("\nSoft Metrics:")
if self.per_label_soft_scores:
soft_scores = [
{
"label": label,
"avg_pr": f"{metrics.average_precision:.3f}",
"roc_auc": f"{(metrics.roc_auc or 0.0):.3f}",
}
for label, metrics in sorted(self.per_label_soft_scores.items())
]
columns = {
"label": "Label",
"avg_pr": "Average precision",
"roc_auc": "ROC AUC",
}
print(ascii_table(soft_scores, columns))
print("\nRecall at Precision")
r_at_p_thresholds = set(
itertools.chain.from_iterable(
metrics.recall_at_precision
for metrics in self.per_label_soft_scores.values()
)
)
print(
ascii_table(
(
dict(
{"label": label},
**{
str(p): f"{r:.3f}"
for p, r in metrics.recall_at_precision.items()
},
)
for label, metrics in sorted(self.per_label_soft_scores.items())
),
dict(
{"label": "Label"},
**{str(t): f"R@P {t}" for t in r_at_p_thresholds},
),
alignments={"label": "<"},
)
)
print("\nPrecision at Recall")
p_at_r_thresholds = set(
itertools.chain.from_iterable(
metrics.precision_at_recall
for metrics in self.per_label_soft_scores.values()
)
)
print(
ascii_table(
(
dict(
{"label": label},
**{
str(p): f"{r:.3f}"
for p, r in metrics.precision_at_recall.items()
},
)
for label, metrics in sorted(self.per_label_soft_scores.items())
),
dict(
{"label": "Label"},
**{str(t): f"P@R {t}" for t in p_at_r_thresholds},
),
alignments={"label": "<"},
)
)
if self.mcc:
print(f"\nMatthews correlation coefficient: {self.mcc :.3f}")
if self.roc_auc:
print(f"\nROC AUC: {self.roc_auc:.3f}")
if report_pep:
self.print_pep()
[docs] def print_pep(self):
metrics = {"Accuracy": f"{self.accuracy * 100:.2f}"}
if self.roc_auc:
metrics["ROC AUC"] = f"{self.roc_auc :.3f}"
for key, value in metrics.items():
info = {"type": "NET", "metric": key, "unit": "None", "value": value}
print("PyTorchObserver " + json_dumps(info))
[docs]class Confusions:
"""
Confusion information for a collection of predictions.
Attributes:
TP: Number of true positives.
FP: Number of false positives.
FN: Number of false negatives.
"""
__slots__ = "TP", "FP", "FN"
def __init__(self, TP: int = 0, FP: int = 0, FN: int = 0) -> None:
self.TP: int = TP
self.FP: int = FP
self.FN: int = FN
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Confusions):
return NotImplemented
return self.TP == other.TP and self.FP == other.FP and self.FN == other.FN
def __add__(self, other: "Confusions") -> "Confusions":
return Confusions(
TP=self.TP + other.TP, FP=self.FP + other.FP, FN=self.FN + other.FN
)
def __iadd__(self, other: "Confusions") -> "Confusions":
self.TP += other.TP
self.FP += other.FP
self.FN += other.FN
return self
def _asdict(self) -> Dict:
return {"TP": self.TP, "FP": self.FP, "FN": self.FN}
[docs] def compute_metrics(self) -> PRF1Scores:
precision, recall, f1 = compute_prf1(self.TP, self.FP, self.FN)
return PRF1Scores(
true_positives=self.TP,
false_positives=self.FP,
false_negatives=self.FN,
precision=precision,
recall=recall,
f1=f1,
)
[docs]class PerLabelConfusions:
"""
Per label confusion information.
Attributes:
label_confusions_map: Map from label string to the corresponding confusion
counts.
"""
__slots__ = "label_confusions_map"
def __init__(self) -> None:
self.label_confusions_map: DefaultDict[str, Confusions] = defaultdict(
Confusions
)
[docs] def update(self, label: str, item: str, count: int) -> None:
"""
Increase one of TP, FP or FN count for a label by certain amount.
Args:
label: Label to be modified.
item: Type of count to be modified, should be one of "TP", "FP" or "FN".
count: Amount to be added to the count.
Returns:
None
"""
confusions = self.label_confusions_map[label]
setattr(confusions, item, getattr(confusions, item) + count)
[docs] def compute_metrics(self) -> MacroPRF1Metrics:
per_label_scores: Dict[str, PRF1Scores] = {}
precision_sum, recall_sum, f1_sum = 0.0, 0.0, 0.0
for label, confusions in sorted(self.label_confusions_map.items()):
scores = confusions.compute_metrics()
per_label_scores[label] = scores
if confusions.TP + confusions.FN > 0:
precision_sum += scores.precision
recall_sum += scores.recall
f1_sum += scores.f1
num_labels = len(self.label_confusions_map)
return MacroPRF1Metrics(
per_label_scores=per_label_scores,
macro_scores=MacroPRF1Scores(
num_labels=num_labels,
precision=safe_division(precision_sum, num_labels),
recall=safe_division(recall_sum, num_labels),
f1=safe_division(f1_sum, num_labels),
),
)
[docs]class AllConfusions:
"""
Aggregated class for per label confusions.
Attributes:
per_label_confusions: Per label confusion information.
confusions: Overall TP, FP and FN counts across the labels in
`per_label_confusions`.
"""
__slots__ = "per_label_confusions", "confusions"
def __init__(self) -> None:
self.per_label_confusions = PerLabelConfusions()
self.confusions = Confusions()
[docs] def compute_metrics(self) -> PRF1Metrics:
per_label_metrics = self.per_label_confusions.compute_metrics()
return PRF1Metrics(
per_label_scores=per_label_metrics.per_label_scores,
macro_scores=per_label_metrics.macro_scores,
micro_scores=self.confusions.compute_metrics(),
)
[docs]class PairwiseRankingMetrics(NamedTuple):
"""
Metric class for pairwise ranking
Attributes:
num_examples (int): number of samples
accuracy (float): how many times did we rank in the correct order
average_score_difference (float): average score(higherRank) - score(lowerRank)
"""
num_examples: int
accuracy: float
average_score_difference: float
[docs] def print_metrics(self) -> None:
print(f"RankingAccuracy: {self.accuracy * 100:.2f}")
print(f"AvgScoreDiff: {self.average_score_difference}")
print(f"NumExamples: {self.num_examples}")
[docs]class RegressionMetrics(NamedTuple):
"""
Metrics for regression tasks.
Attributes:
num_examples (int): number of examples
pearson_correlation (float): correlation between predictions and labels
mse (float): mean-squared error between predictions and labels
"""
num_examples: int
pearson_correlation: float
mse: float
[docs] def print_metrics(self):
print(f"Num examples: {self.num_examples}")
print(f"Pearson correlation: {self.pearson_correlation:.3f}")
print(f"Mean squared error: {self.mse:.3f}")
[docs]class RealtimeMetrics(NamedTuple):
"""
Realtime Metrics for tracking training progress and performance.
Attributes:
samples (int): number of samples
tps (float): tokens per second
ups (float): updates per second
"""
samples: int
tps: float
ups: float
def _format(self, key, value):
if key in ("tps", "ups"):
return round(value)
return value
def __str__(self):
metrics = {"num_gpus": cuda.DISTRIBUTED_WORLD_SIZE}
for key, value in self._asdict().items():
if not value:
continue
metrics[key] = self._format(key, value)
return str(metrics)
[docs]def safe_division(n: Union[int, float], d: int) -> float:
return float(n) / d if d else 0.0
[docs]def compute_prf1(tp: int, fp: int, fn: int) -> Tuple[float, float, float]:
precision = safe_division(tp, tp + fp)
recall = safe_division(tp, tp + fn)
f1 = safe_division(2 * tp, 2 * tp + fp + fn)
return (precision, recall, f1)
[docs]def average_precision_score(
y_true_sorted: np.ndarray, y_score_sorted: np.ndarray
) -> float:
"""
Computes average precision, which summarizes the precision-recall curve as the
precisions achieved at each threshold weighted by the increase in recall since the
previous threshold.
Args:
y_true_sorted: Numpy array sorted according to decreasing confidence scores
indicating whether each prediction is correct.
y_score_sorted Numpy array of confidence scores for the predictions in
decreasing order.
Returns:
Average precision score.
TODO: This is too slow, improve the performance
"""
ap = 0.0
tp = 0
threshold = y_score_sorted[0]
y_score_sorted = np.append(y_score_sorted[1:], np.NAN)
total_positive = np.sum(y_true_sorted)
added_positives = 0
for k, (label, score) in enumerate(zip(y_true_sorted, y_score_sorted)):
if label:
added_positives += 1
if score != threshold:
threshold = score
recall_diff = added_positives / total_positive
tp += added_positives
added_positives = 0
p_at_tresh = tp / (k + 1)
ap += p_at_tresh * recall_diff
return float(ap)
[docs]def sort_by_score(y_true_list: Sequence[bool], y_score_list: Sequence[float]):
y_true = np.array(y_true_list)
y_score = np.array(y_score_list)
sort_indices = np.argsort(y_score, kind="mergesort")[::-1]
y_true = y_true[sort_indices]
y_score = y_score[sort_indices]
return y_true, y_score
[docs]def recall_at_precision(
y_true_sorted: np.ndarray, y_score_sorted: np.ndarray, thresholds: Sequence[float]
) -> Dict[float, float]:
"""
Computes recall at various precision levels
Args:
y_true_sorted: Numpy array sorted according to decreasing confidence scores
indicating whether each prediction is correct.
y_score_sorted: Numpy array of confidence scores for the predictions in
decreasing order.
thresholds: Sequence of floats indicating the requested precision thresholds
Returns:
Dictionary of maximum recall at requested precision thresholds.
"""
y_score_shift = np.append(y_score_sorted[1:], np.nan)
score_change = (y_score_sorted - y_score_shift) != 0
cum_sum = np.cumsum(y_true_sorted)
recall_at_precision_dict = {t: 0.0 for t in thresholds}
decision_thresh_at_precision_dict = {t: 0.0 for t in thresholds}
sum_y_true = y_true_sorted.sum()
if sum_y_true == 0:
return recall_at_precision_dict, decision_thresh_at_precision_dict
recall = cum_sum / sum_y_true
precision = cum_sum / np.array(range(1, len(y_true_sorted) + 1))
for threshold in thresholds:
meets_requirements = np.logical_and(precision >= threshold, score_change)
if not np.any(meets_requirements):
continue
recall_at_precision_dict[threshold] = float(
max(np.extract(meets_requirements, recall))
)
decision_thresh_at_precision_dict[threshold] = float(
min(np.extract(meets_requirements, y_score_sorted))
)
return recall_at_precision_dict, decision_thresh_at_precision_dict
[docs]def precision_at_recall(
y_true_sorted: np.ndarray, y_score_sorted: np.ndarray, thresholds: Sequence[float]
) -> Tuple[Dict[float, float], Dict[float, float]]:
"""
Computes precision at various recall levels
Args:
y_true_sorted: Numpy array sorted according to decreasing confidence scores
indicating whether each prediction is correct.
y_score_sorted: Numpy array of confidence scores for the predictions in
decreasing order.
thresholds: Sequence of floats indicating the requested recall thresholds
Returns:
Dictionary of maximum precision at requested recall thresholds.
Dictionary of decision thresholds resulting in max precision at
requested recall thresholds.
"""
y_score_shift = np.append(y_score_sorted[1:], np.nan)
score_change = (y_score_sorted - y_score_shift) != 0
cum_sum = np.cumsum(y_true_sorted)
precision_at_recall_dict = {t: 0.0 for t in thresholds}
decision_thresh_at_recall_dict = {t: 0.0 for t in thresholds}
sum_y_true = y_true_sorted.sum()
if sum_y_true == 0:
return precision_at_recall_dict, decision_thresh_at_recall_dict
recall = cum_sum / sum_y_true
precision = cum_sum / np.array(range(1, len(y_true_sorted) + 1))
for threshold in thresholds:
meets_requirements = np.logical_and(recall >= threshold, score_change)
if not np.any(meets_requirements):
continue
precisions_meeting_requirements = np.extract(meets_requirements, precision)
idx_max_precision_at_recall = np.amin(
np.argmax(precisions_meeting_requirements), axis=None
)
precision_at_recall_dict[threshold] = float(
precisions_meeting_requirements[idx_max_precision_at_recall]
)
decision_thresh_at_recall_dict[threshold] = float(
np.extract(meets_requirements, y_score_sorted)[idx_max_precision_at_recall]
)
return precision_at_recall_dict, decision_thresh_at_recall_dict
[docs]def compute_average_recall(
predictions: Sequence[LabelPrediction],
label_names: Sequence[str],
average_precisions: Dict[str, float],
) -> float:
recalls = []
for i, label_name in enumerate(label_names):
y_true = []
y_score = []
for label_scores, _, expected in predictions:
y_true.append(expected == i)
y_score.append(label_scores[i])
y_true_sorted, y_score_sorted = sort_by_score(y_true, y_score)
recall_at_precision_dict, _ = recall_at_precision(
y_true_sorted, y_score_sorted, [average_precisions[label_name]]
)
for _, value in recall_at_precision_dict.items():
recalls.append(value)
return sum(v for v in recalls) / (len(recalls) * 1.0)
[docs]def compute_soft_metrics(
predictions: Sequence[LabelPrediction],
label_names: Sequence[str],
recall_at_precision_thresholds: Sequence[float] = RECALL_AT_PRECISION_THRESHOLDS,
precision_at_recall_thresholds: Sequence[float] = PRECISION_AT_RECALL_THRESHOLDS,
) -> Dict[str, SoftClassificationMetrics]:
"""
Computes soft classification metrics given a list of label predictions.
Args:
predictions: Label predictions, including the confidence score for each label.
label_names: Indexed label names.
recall_at_precision_thresholds: precision thresholds at which to calculate
recall
precision_at_recall_thresholds: recall thresholds at which to calculate
precision
Returns:
Dict from label strings to their corresponding soft metrics.
"""
soft_metrics = {}
for i, label_name in enumerate(label_names):
y_true = []
y_score = []
for label_scores, _, expected in predictions:
y_true.append(expected == i)
y_score.append(label_scores[i])
y_true_sorted, y_score_sorted = sort_by_score(y_true, y_score)
ap = average_precision_score(y_true_sorted, y_score_sorted)
recall_at_precision_dict, decision_thresh_at_precision = recall_at_precision(
y_true_sorted, y_score_sorted, recall_at_precision_thresholds
)
precision_at_recall_dict, decision_thresh_at_recall = precision_at_recall(
y_true_sorted, y_score_sorted, precision_at_recall_thresholds
)
roc_auc = compute_roc_auc(predictions, target_class=i)
soft_metrics[label_name] = SoftClassificationMetrics(
average_precision=ap,
recall_at_precision=recall_at_precision_dict,
decision_thresh_at_precision=decision_thresh_at_precision,
precision_at_recall=precision_at_recall_dict,
decision_thresh_at_recall=decision_thresh_at_recall,
roc_auc=roc_auc,
)
return soft_metrics
[docs]def compute_multi_label_soft_metrics(
predictions: Sequence[LabelListPrediction],
label_names: Sequence[str],
recall_at_precision_thresholds: Sequence[float] = RECALL_AT_PRECISION_THRESHOLDS,
precision_at_recall_thresholds: Sequence[float] = PRECISION_AT_RECALL_THRESHOLDS,
) -> Dict[str, SoftClassificationMetrics]:
"""
Computes multi-label soft classification metrics
Args:
predictions: multi-label predictions,
including the confidence score for each label.
label_names: Indexed label names.
recall_at_precision_thresholds: precision thresholds at which to calculate
recall
precision_at_recall_thresholds: recall thresholds at which to calculate
precision
Returns:
Dict from label strings to their corresponding soft metrics.
"""
soft_metrics = {}
for i, label_name in enumerate(label_names):
y_true = []
y_score = []
for label_scores, _, expected in predictions:
y_true.append(i in expected)
y_score.append(label_scores[i])
y_true_sorted, y_score_sorted = sort_by_score(y_true, y_score)
ap = average_precision_score(y_true_sorted, y_score_sorted)
recall_at_precision_dict, decision_thresh_at_precision = recall_at_precision(
y_true_sorted, y_score_sorted, recall_at_precision_thresholds
)
precision_at_recall_dict, decision_thresh_at_recall = precision_at_recall(
y_true_sorted, y_score_sorted, precision_at_recall_thresholds
)
roc_auc = compute_roc_auc_given_sorted_positives(y_true_sorted)
print(f"label_name {label_name}", flush=True)
print(roc_auc, flush=True)
soft_metrics[label_name] = SoftClassificationMetrics(
average_precision=ap,
recall_at_precision=recall_at_precision_dict,
decision_thresh_at_precision=decision_thresh_at_precision,
precision_at_recall=precision_at_recall_dict,
decision_thresh_at_recall=decision_thresh_at_recall,
roc_auc=roc_auc,
)
return soft_metrics
[docs]def compute_multi_label_soft_full_vector_metrics(
predictions: Sequence[LabelListPrediction],
label_names: Sequence[str],
recall_at_precision_thresholds: Sequence[float] = RECALL_AT_PRECISION_THRESHOLDS,
precision_at_recall_thresholds: Sequence[float] = PRECISION_AT_RECALL_THRESHOLDS,
) -> Dict[str, SoftClassificationMetrics]:
"""
Computes multi-label soft classification metrics
Args:
predictions: multi-label predictions,
including the confidence score for each label.
label_names: Indexed label names. May contain duplicate label names.
recall_at_precision_thresholds: precision thresholds at which to calculate
recall
precision_at_recall_thresholds: recall thresholds at which to calculate
precision
Returns:
Dict from label strings to their corresponding soft metrics.
"""
soft_metrics = {}
y_true = {}
y_score = {}
for i, label_name in enumerate(label_names):
if label_name not in y_true:
y_true[label_name] = []
y_score[label_name] = []
for label_scores, _, expected in predictions:
y_true[label_name].append(expected[i])
y_score[label_name].append(label_scores[i])
for i, label_name in enumerate(label_names):
y_true_sorted, y_score_sorted = sort_by_score(
y_true[label_name], y_score[label_name]
)
ap = average_precision_score(y_true_sorted, y_score_sorted)
recall_at_precision_dict, decision_thresh_at_precision = recall_at_precision(
y_true_sorted, y_score_sorted, recall_at_precision_thresholds
)
precision_at_recall_dict, decision_thresh_at_recall = precision_at_recall(
y_true_sorted, y_score_sorted, precision_at_recall_thresholds
)
roc_auc = compute_roc_auc(predictions, target_class=i)
soft_metrics[label_name] = SoftClassificationMetrics(
average_precision=ap,
recall_at_precision=recall_at_precision_dict,
decision_thresh_at_precision=decision_thresh_at_precision,
precision_at_recall=precision_at_recall_dict,
decision_thresh_at_recall=decision_thresh_at_recall,
roc_auc=roc_auc,
)
return soft_metrics
[docs]def compute_multi_label_multi_class_soft_metrics(
predictions: Sequence[Sequence[LabelPrediction]],
label_names: Sequence[str],
label_vocabs: Sequence[Sequence[str]],
recall_at_precision_thresholds: Sequence[float] = RECALL_AT_PRECISION_THRESHOLDS,
precision_at_recall_thresholds: Sequence[float] = PRECISION_AT_RECALL_THRESHOLDS,
) -> MultiLabelSoftClassificationMetrics:
"""
Computes multi-label soft classification metrics with multi-class accommodation
Args:
predictions: multi-label predictions,
including the confidence score for each label.
label_names: Indexed label names.
recall_at_precision_thresholds: precision thresholds at which to calculate
recall
precision_at_recall_thresholds: recall thresholds at which to calculate
precision
Returns:
Dict from label strings to their corresponding soft metrics.
"""
average_precision = {}
average_recall = {}
recall_at_precision = {}
decision_thresh_at_precision = {}
precision_at_recall = {}
decision_thresh_at_recall = {}
roc_auc = {}
class_accuracy = {}
average_auc = []
for label_idx, label_vocab in enumerate(label_vocabs):
label = list(label_names)[label_idx]
avg = (
sum(1 for s, p, e in predictions[label_idx] if p == e)
/ len(predictions[label_idx])
* 1.0
)
class_accuracy[label] = avg
soft_metrics_ = compute_soft_metrics(predictions[label_idx], label_vocab)
temp_avg_precision_ = {k: v.average_precision for k, v in soft_metrics_.items()}
average_precision[label] = sum(
v for k, v in temp_avg_precision_.items() if k not in NAN_LABELS
) / (
sum(1 for k, v in temp_avg_precision_.items() if k not in NAN_LABELS) * 1.0
)
average_recall[label] = compute_average_recall(
predictions[label_idx], label_vocab, temp_avg_precision_
)
recall_at_precision[label] = {
k: v.recall_at_precision for k, v in soft_metrics_.items()
}
decision_thresh_at_precision[label] = {
k: v.decision_thresh_at_precision for k, v in soft_metrics_.items()
}
precision_at_recall[label] = {
k: v.precision_at_recall for k, v in soft_metrics_.items()
}
decision_thresh_at_recall[label] = {
k: v.decision_thresh_at_recall for k, v in soft_metrics_.items()
}
roc_auc[label] = {k: v.roc_auc for k, v in soft_metrics_.items()}
average_auc.append(
sum(v for v in roc_auc[label].values()) / (len(roc_auc[label]) * 1.0)
)
return MultiLabelSoftClassificationMetrics(
average_label_precision=average_precision,
average_overall_precision=sum(v for v in average_precision.values())
/ (len(average_precision) * 1.0),
average_label_recall=average_recall,
average_overall_recall=sum(v for v in average_recall.values())
/ (len(average_recall) * 1.0),
recall_at_precision=recall_at_precision,
decision_thresh_at_precision=decision_thresh_at_precision,
precision_at_recall=precision_at_recall,
decision_thresh_at_recall=decision_thresh_at_recall,
roc_auc=roc_auc,
average_overall_auc=sum(v for v in average_auc) / (len(average_auc) * 1.0),
label_accuracy=class_accuracy,
average_overall_accuracy=sum(v for v in class_accuracy.values())
/ (len(class_accuracy) * 1.0),
)
[docs]def compute_matthews_correlation_coefficients(
TP: int, FP: int, FN: int, TN: int
) -> float:
"""
Computes Matthews correlation coefficient, a way to summarize all four counts (TP,
FP, FN, TN) in the confusion matrix of binary classification.
Args:
TP: Number of true positives.
FP: Number of false positives.
FN: Number of false negatives.
TN: Number of true negatives.
Returns:
Matthews correlation coefficient, which is `sqrt((TP + FP) * (TP + FN) *
(TN + FP) * (TN + FN))`.
"""
mcc = safe_division(
(TP * TN) - (FP * FN),
np.sqrt(float((TP + FP) * (TP + FN) * (TN + FP) * (TN + FN))),
)
return mcc
[docs]def compute_roc_auc_given_sorted_positives(
y_true_sorted: np.ndarray,
) -> Optional[float]:
# Compute auc as probability that a positive example is scored higher than
# a negative example.
n_false = 0
n_correct_pair_order = 0
for y in reversed(y_true_sorted): # want low predicted to high predicted
if y:
n_correct_pair_order += n_false
else:
n_false += 1
n_true = len(y_true_sorted) - n_false
if n_true == 0 or n_false == 0:
return None
return float(n_correct_pair_order / (n_true * n_false))
[docs]def compute_roc_auc(
predictions: Sequence[LabelPrediction], target_class: int = 0
) -> Optional[float]:
"""
Computes area under the Receiver Operating Characteristic curve, for binary
classification. Implementation based off of (and explained at)
https://www.ibm.com/developerworks/community/blogs/jfp/entry/Fast_Computation_of_AUC_ROC_score?lang=en.
"""
# Collect scores
y_true = [expected == target_class for _, _, expected in predictions]
y_score = [label_scores[target_class] for label_scores, _, _ in predictions]
y_true_sorted, _ = sort_by_score(y_true, y_score)
return compute_roc_auc_given_sorted_positives(y_true_sorted)
[docs]def compute_classification_metrics(
predictions: Sequence[LabelPrediction],
label_names: Sequence[str],
loss: float,
average_precisions: bool = True,
recall_at_precision_thresholds: Sequence[float] = RECALL_AT_PRECISION_THRESHOLDS,
precision_at_recall_thresholds: Sequence[float] = PRECISION_AT_RECALL_THRESHOLDS,
) -> ClassificationMetrics:
"""
A general function that computes classification metrics given a list of label
predictions.
Args:
predictions: Label predictions, including the confidence score for each label.
label_names: Indexed label names.
average_precisions: Whether to compute average precisions for labels or not.
Defaults to True.
recall_at_precision_thresholds: precision thresholds at which
to calculate recall
precision_at_recall_thresholds: recall thresholds at which
to calculate precision
Returns:
ClassificationMetrics which contains various classification metrics.
"""
num_correct = 0
per_label_confusions = PerLabelConfusions()
for _, predicted, expected in predictions:
predicted_label = label_names[predicted]
expected_label = label_names[expected]
if predicted_label == expected_label:
num_correct += 1
per_label_confusions.update(expected_label, "TP", 1)
else:
per_label_confusions.update(expected_label, "FN", 1)
per_label_confusions.update(predicted_label, "FP", 1)
accuracy = safe_division(num_correct, len(predictions))
macro_prf1_metrics = per_label_confusions.compute_metrics()
soft_metrics = (
compute_soft_metrics(
predictions,
label_names,
recall_at_precision_thresholds,
precision_at_recall_thresholds,
)
if average_precisions
else None
)
if len(label_names) == 2:
confusion_dict = per_label_confusions.label_confusions_map
# Since MCC is symmetric, it doesn't matter which label is 0 and which is 1
TP = confusion_dict[label_names[0]].TP
FP = confusion_dict[label_names[0]].FP
FN = confusion_dict[label_names[0]].FN
TN = confusion_dict[label_names[1]].TP
mcc: Optional[float] = compute_matthews_correlation_coefficients(TP, FP, FN, TN)
roc_auc: Optional[float] = compute_roc_auc(predictions)
else:
mcc = None
roc_auc = None
return ClassificationMetrics(
accuracy=accuracy,
macro_prf1_metrics=macro_prf1_metrics,
per_label_soft_scores=soft_metrics,
mcc=mcc,
roc_auc=roc_auc,
loss=loss,
)
[docs]def compute_multi_label_classification_metrics(
predictions: Sequence[LabelListPrediction],
label_names: Sequence[str],
loss: float,
average_precisions: bool = True,
recall_at_precision_thresholds: Sequence[float] = RECALL_AT_PRECISION_THRESHOLDS,
precision_at_recall_thresholds: Sequence[float] = PRECISION_AT_RECALL_THRESHOLDS,
) -> ClassificationMetrics:
"""
A general function that computes classification metrics given a list of multi-label
predictions.
Args:
predictions: multi-label predictions,
including the confidence score for each label.
label_names: Indexed label names.
average_precisions: Whether to compute average precisions for labels or not.
Defaults to True.
recall_at_precision_thresholds: precision thresholds at which
to calculate recall
precision_at_recall_thresholds: recall thresholds at which
to calculate precision
Returns:
ClassificationMetrics which contains various classification metrics.
"""
num_correct = 0
num_expected_labels = 0
per_label_confusions = PerLabelConfusions()
for _, predicted, expected in predictions:
for label_idx, label_name in enumerate(label_names):
num_expected_labels += 1
# "predicted" is in the format of n_hot_encoding
if predicted[label_idx] == 1:
if label_idx in expected: # TP
num_correct += 1
per_label_confusions.update(label_name, "TP", 1)
else: # FP
per_label_confusions.update(label_name, "FP", 1)
else:
if label_idx in expected: # FN
per_label_confusions.update(label_name, "FN", 1)
else: # TN, update correct num
num_correct += 1
accuracy = safe_division(num_correct, num_expected_labels)
macro_prf1_metrics = per_label_confusions.compute_metrics()
soft_metrics = (
compute_multi_label_soft_metrics(
predictions,
label_names,
recall_at_precision_thresholds,
precision_at_recall_thresholds,
)
if average_precisions
else None
)
roc_auc = compute_macro_avg(soft_metrics, "roc_auc") if average_precisions else None
if len(label_names) == 2:
confusion_dict = per_label_confusions.label_confusions_map
# Since MCC is symmetric, it doesn't matter which label is 0 and which is 1
TP = confusion_dict[label_names[0]].TP
FP = confusion_dict[label_names[0]].FP
FN = confusion_dict[label_names[0]].FN
TN = confusion_dict[label_names[1]].TP
mcc: Optional[float] = compute_matthews_correlation_coefficients(TP, FP, FN, TN)
else:
mcc = None
return ClassificationMetrics(
accuracy=accuracy,
macro_prf1_metrics=macro_prf1_metrics,
per_label_soft_scores=soft_metrics,
mcc=mcc,
roc_auc=roc_auc,
loss=loss,
)
[docs]def compute_multi_label_full_vector_classification_metrics(
predictions: Sequence[LabelListPrediction],
label_names: Sequence[str],
loss: float,
average_precisions: bool = True,
recall_at_precision_thresholds: Sequence[float] = RECALL_AT_PRECISION_THRESHOLDS,
precision_at_recall_thresholds: Sequence[float] = PRECISION_AT_RECALL_THRESHOLDS,
) -> ClassificationMetrics:
"""
A general function that computes classification metrics given a list of multi-label
predictions.
Args:
predictions: multi-label predictions,
including the confidence score for each label.
label_names: Indexed label names.
average_precisions: Whether to compute average precisions for labels or not.
Defaults to True.
recall_at_precision_thresholds: precision thresholds at which
to calculate recall
precision_at_recall_thresholds: recall thresholds at which
to calculate precision
Returns:
ClassificationMetrics which contains various classification metrics.
"""
num_correct = 0
num_expected_labels = 0
per_label_confusions = PerLabelConfusions()
for _, predicted, expected in predictions:
for label_idx, label_name in enumerate(label_names):
if expected[label_idx] > 0: # TP
num_expected_labels += 1
# "predicted" is in the format of n_hot_encoding
if predicted[label_idx] == 1:
if expected[label_idx] > 0: # TP
num_correct += 1
per_label_confusions.update(label_name, "TP", 1)
else: # FP
per_label_confusions.update(label_name, "FP", 1)
else:
if expected[label_idx] > 0: # FN
per_label_confusions.update(label_name, "FN", 1)
accuracy = safe_division(num_correct, num_expected_labels)
macro_prf1_metrics = per_label_confusions.compute_metrics()
soft_metrics = (
compute_multi_label_soft_full_vector_metrics(
predictions,
label_names,
recall_at_precision_thresholds,
precision_at_recall_thresholds,
)
if average_precisions
else None
)
if len(label_names) == 2:
confusion_dict = per_label_confusions.label_confusions_map
# Since MCC is symmetric, it doesn't matter which label is 0 and which is 1
TP = confusion_dict[label_names[0]].TP
FP = confusion_dict[label_names[0]].FP
FN = confusion_dict[label_names[0]].FN
TN = confusion_dict[label_names[1]].TP
mcc: Optional[float] = compute_matthews_correlation_coefficients(TP, FP, FN, TN)
roc_auc: Optional[float] = compute_roc_auc(predictions)
else:
mcc = None
roc_auc = None
return ClassificationMetrics(
accuracy=accuracy,
macro_prf1_metrics=macro_prf1_metrics,
per_label_soft_scores=soft_metrics,
mcc=mcc,
roc_auc=roc_auc,
loss=loss,
)
[docs]def compute_macro_avg(soft_metrics: Dict[str, SoftClassificationMetrics], metric: str):
avg = 0
for metrics in soft_metrics.values():
metric_value = getattr(metrics, metric, None)
print(f"metirc value {metric_value}")
if metric_value is None:
return None
avg += metric_value
return avg / len(soft_metrics)
[docs]def compute_pairwise_ranking_metrics(
predictions: Sequence[int], scores: Sequence[float]
) -> PairwiseRankingMetrics:
"""
Computes metrics for pairwise ranking given sequences of predictions and scores
Args:
predictions : 1 if ranking was correct, 0 if ranking was incorrect
scores : score(higher-ranked-sample) - score(lower-ranked-sample)
Returns:
PairwiseRankingMetrics object
"""
return PairwiseRankingMetrics(
num_examples=len(predictions),
accuracy=safe_division(sum(predictions), len(predictions)),
average_score_difference=safe_division(sum(scores), len(predictions)),
)
[docs]def compute_regression_metrics(
predictions: Sequence[float], targets: Sequence[float]
) -> RegressionMetrics:
"""
Computes metrics for regression tasks.abs
Args:
predictions: 1-D sequence of float predictions
targets: 1-D sequence of float labels
Returns:
RegressionMetrics object
"""
preds, targs = np.array(predictions), np.array(targets)
pred_mean, targ_mean = preds.mean(), targs.mean()
covariance = (preds - pred_mean).dot(targs - targ_mean) / preds.size
corr = covariance / preds.std() / targs.std()
mse = np.square(preds - targs).mean()
return RegressionMetrics(num_examples=len(preds), pearson_correlation=corr, mse=mse)