modules
Top-level package for evalify.
evalify
¶
Evalify main module used for creating the verification experiments.
Creates experiments with embedding pairs to compare for face verification tasks including positive pairs, negative pairs and metrics calculations using a very optimized einstein sum. Many operations are dispatched to canonical BLAS, cuBLAS, or other specialized routines. Extremely large arrays are split into smaller batches, every batch would consume the roughly the maximum available memory.
Typical usage example:
1 2 |
|
Experiment
¶
Source code in evalify/evalify.py
class Experiment:
def __init__(self) -> None:
self.experiment_sucess = False
self.cached_predicted_as_similarity = {}
def __call__(self, *args: Any, **kwds: Any) -> Any:
return self.run(*args, **kwds)
def run(
self,
X: np.ndarray,
y: np.ndarray,
metrics: Union[str, Sequence[str]] = "cosine_similarity",
same_class_samples: T_str_int = "full",
different_class_samples: Union[str, int, Sequence[T_str_int]] = "minimal",
batch_size: Union[T_str_int, None] = "best",
shuffle: bool = False,
seed: Union[int, None] = None,
return_embeddings: bool = False,
p: int = 3,
) -> pd.DataFrame:
"""Runs an experiment for face verification
Args:
X: Embeddings array
y: Targets for X as integers
metrics:
- 'cosine_similarity'
- 'pearson_similarity'
- 'cosine_distance'
- 'euclidean_distance'
- 'euclidean_distance_l2'
- 'minkowski_distance'
- 'manhattan_distance'
- 'chebyshev_distance'
- list/tuple containing more than one of them.
same_class_samples:
- 'full': Samples all possible images within each class to create all
all possible positive pairs.
- int: Samples specific number of images for every class to create
nC2 pairs where n is passed integer.
different_class_samples:
- 'full': Samples one image from every class with all possible pairs
of different classes. This can grow exponentially as the number
of images increase. (N, M) = (1, "full")
- 'minimal': Samples one image from every class with one image of
all other classes. (N, M) = (1, 1). (Default)
- int: Samples one image from every class with provided number of
images of every other class.
- tuple or list: (N, M) Samples N images from every class with M images of
every other class.
batch_size:
- 'best': Let the program decide based on available memory such that every
batch will fit into the available memory. (Default)
- int: Manually decide the batch_size.
- None: No batching. All experiment and intermediate results must fit into
memory or a MemoryError will be raised.
shuffle: Whether to shuffle the returned experiment dataframe. Default: False.
return_embeddings: Whether to return the embeddings instead of indexes.
Default: False
p:
The order of the norm of the difference. Should be `0 < p < 1`, Only valid with minkowski_distance as a metric.
Default = 3
Returns:
pandas.DataFrame: A DataFrame representing the experiment results.
Raises:
ValueError: An error occurred with the provided arguments.
Notes:
`same_class_samples`:
If the provided number is greater than the achievable for the class,
the maximum possible combinations are used.
`different_class_samples`:
If the provided number is greater than the achievable for the class,
the maximum possible combinations are used. (N, M) can also be
('full', 'full') but this will calculate all possible combinations
between all posibile negative samples. If the dataset is not small
this will probably result in an extremely large array!.
"""
if isinstance(metrics, str):
metrics = (metrics,)
self._validate_args(
metrics, same_class_samples, different_class_samples, batch_size, p
)
X, y = _validate_vectors(X, y)
all_targets = np.unique(y)
all_pairs = []
metric_fns = list(map(metrics_caller.get, metrics))
self.seed = seed
self.rng = np.random.default_rng(self.seed)
for target in all_targets:
all_pairs += self._get_pairs(
y,
same_class_samples,
different_class_samples,
target,
)
self.df = pd.DataFrame(
data=all_pairs, columns=["img_a", "img_b", "target_a", "target_b", "target"]
)
experiment_size = len(self.df)
if shuffle:
self.df = self.df.sample(frac=1, random_state=seed)
if batch_size == "best":
batch_size = calculate_best_batch_size(X)
elif batch_size is None:
batch_size = experiment_size
kwargs = {}
if any(metric in METRICS_NEED_NORM for metric in metrics):
kwargs["norms"] = get_norms(X)
if any(metric in METRICS_NEED_ORDER for metric in metrics):
kwargs["p"] = p
img_a = self.df.img_a.to_numpy()
img_b = self.df.img_b.to_numpy()
img_a_s = np.array_split(img_a, np.ceil(experiment_size / batch_size))
img_b_s = np.array_split(img_b, np.ceil(experiment_size / batch_size))
for metric, metric_fn in zip(metrics, metric_fns):
self.df[metric] = np.hstack(
[metric_fn(X, i, j, **kwargs) for i, j in zip(img_a_s, img_b_s)]
)
if return_embeddings:
self.df["img_a"] = X[img_a].tolist()
self.df["img_b"] = X[img_b].tolist()
self.experiment_sucess = True
self.metrics = metrics
return self.df
def _get_pairs(
self,
y,
same_class_samples,
different_class_samples,
target,
) -> List[Tuple]:
"""Generates experiment pairs."""
same_ixs_full = np.argwhere(y == target).ravel()
if isinstance(same_class_samples, int):
same_class_samples = min(len(same_ixs_full), same_class_samples)
same_ixs = self.rng.choice(same_ixs_full, same_class_samples)
elif same_class_samples == "full":
same_ixs = same_ixs_full
same_pairs = itertools.combinations(same_ixs, 2)
same_pairs = [(a, b, target, target, 1) for a, b in same_pairs]
different_ixs = np.argwhere(y != target).ravel()
diff_df = pd.DataFrame(data={"ix": different_ixs, "target": y[different_ixs]})
diff_df = diff_df.sample(frac=1, random_state=self.seed)
if different_class_samples in ["full", "minimal"] or isinstance(
different_class_samples, int
):
N = 1
if different_class_samples == "minimal":
diff_df = diff_df.drop_duplicates(subset=["target"])
else:
N, M = different_class_samples
N = len(same_ixs_full) if N == "full" else min(N, len(same_ixs_full))
if M != "full":
diff_df = diff_df.groupby("target").apply(lambda x: x[:M]).droplevel(0)
different_ixs = diff_df.ix.to_numpy()
different_pairs = itertools.product(
self.rng.choice(same_ixs_full, N, replace=False), different_ixs
)
different_pairs = [(a, b, target, y[b], 0) for a, b in different_pairs if a < b]
return same_pairs + different_pairs
def _validate_args(
self, metrics, same_class_samples, different_class_samples, batch_size, p
) -> None:
"""Validates passed arguments to Experiment.run() method."""
if same_class_samples != "full" and not isinstance(same_class_samples, int):
raise ValueError(
"`same_class_samples` argument must be one of 'full' or an integer "
f"Received: same_class_samples={same_class_samples}"
)
if different_class_samples not in ("full", "minimal"):
if not isinstance(different_class_samples, (int, list, tuple)):
raise ValueError(
"`different_class_samples` argument must be one of 'full', 'minimal', "
"an integer, a list or tuple of integers or keyword 'full'."
f"Received: different_class_samples={different_class_samples}."
)
elif isinstance(different_class_samples, (list, tuple)):
if (
not (
all(
isinstance(i, int) or i == "full"
for i in different_class_samples
)
)
or (len(different_class_samples)) != 2
):
raise ValueError(
"When passing `different_class_samples` as a tuple or list, "
"elements must be exactly two of integer type or keyword 'full' "
"(N, M). "
f"Received: different_class_samples={different_class_samples}."
)
if (
batch_size != "best"
and not isinstance(batch_size, int)
and batch_size is not None
):
raise ValueError(
'`batch_size` argument must be either "best" or of type integer '
f"Received: batch_size={batch_size} with type {type(batch_size)}."
)
if any(metric not in metrics_caller for metric in metrics):
raise ValueError(
f"`metric` argument must be one of {tuple(metrics_caller.keys())} "
f"Received: metric={metrics}"
)
if p < 1:
raise ValueError(f"`p` must be an int and at least 1. Received: p={p}")
def find_optimal_cutoff(self):
"""Find the optimal cutoff point
Returns:
float: Optimal cutoff value
"""
self.check_experiment_run()
self.optimal_cutoff = {}
for metric in self.metrics:
fpr, tpr, threshold = roc_curve(self.df["target"], self.df[metric])
i = np.arange(len(tpr))
roc = pd.DataFrame(
{
"tf": pd.Series(tpr - (1 - fpr), index=i),
"threshold": pd.Series(threshold, index=i),
}
)
roc_t = roc.iloc[(roc.tf - 0).abs().argsort()[:1]]
self.optimal_cutoff[metric] = roc_t["threshold"].item()
return self.optimal_cutoff
def find_threshold_at_fpr(self, fpr: float):
"""Finds optimal threshold at a given FPR.
Args:
fpr: False positive rate to find best threshold for.
Returns:
dict: A dictionary with keys as metrics and values as thresholds.
Raises:
ValueError: If `fpr` is not between 0 and 1.
"""
self.check_experiment_run()
if not 0 <= fpr <= 1:
raise ValueError(
"`fpr` must be between 0 and 1. " f"Received wanted_fpr={fpr}"
)
threshold_at_fpr = {}
for metric in self.metrics:
predicted = self.predicted_as_similarity(metric)
FPR, TPR, thresholds = roc_curve(
self.df["target"], predicted, drop_intermediate=False
)
df_fpr_tpr = pd.DataFrame({"FPR": FPR, "TPR": TPR, "Threshold": thresholds})
ix_left = np.searchsorted(df_fpr_tpr["FPR"], fpr, side="left")
ix_right = np.searchsorted(df_fpr_tpr["FPR"], fpr, side="right")
if fpr == 0:
best = df_fpr_tpr.iloc[ix_right]
elif fpr == 1 or ix_left == ix_right:
best = df_fpr_tpr.iloc[ix_left]
else:
best = (
df_fpr_tpr.iloc[ix_left]
if abs(df_fpr_tpr.iloc[ix_left].FPR - fpr)
< abs(df_fpr_tpr.iloc[ix_right].FPR - fpr)
else df_fpr_tpr.iloc[ix_right]
)
best = best.to_dict()
if metric in REVERSE_DISTANCE_TO_SIMILARITY:
best["Threshold"] = REVERSE_DISTANCE_TO_SIMILARITY.get(metric)(
best["Threshold"]
)
threshold_at_fpr[metric] = best
return threshold_at_fpr
def get_binary_prediction(self, metric, threshold):
"""Find binary prediction from distance or similarity.
Args:
metric: Metric name for the desired prediction.
threshold: Cut off threshold.
Returns:
pd.Series: Binary predictions.
"""
return (
self.df[metric].apply(lambda x: 1 if x < threshold else 0)
if metric in DISTANCE_TO_SIMILARITY
else self.df[metric].apply(lambda x: 1 if x > threshold else 0)
)
def evaluate_at_threshold(self, threshold: float, metric: str):
"""Evaluate performance at specific threshold
Args:
threshold: Cut-off threshold.
metric: Metric to use.
Returns:
dict: A dict ontaining all evaluation metrics.
"""
self.metrics_evaluation = {}
self.check_experiment_run(metric)
for metric in self.metrics:
predicted = self.get_binary_prediction(metric, threshold)
cm = confusion_matrix(self.df["target"], predicted)
tn, fp, fn, tp = cm.ravel()
TPR = tp / (tp + fn) # recall / true positive rate
TNR = tn / (tn + fp) # true negative rate
PPV = tp / (tp + fp) # precision / positive predicted value
NPV = tn / (tn + fn) # negative predictive value
FPR = fp / (fp + tn) # false positive rate
FNR = 1 - TPR # false negative rate
FDR = 1 - PPV # false discovery rate
FOR = 1 - NPV # false omission rate
F1 = 2 * (PPV * TPR) / (PPV + TPR)
# LRp = TPR / FPR # positive likelihood ratio (LR+)
# LRn = FNR / TNR # negative likelihood ratio (LR+)
evaluation = {
"TPR": TPR,
"TNR": TNR,
"PPV": PPV,
"NPV": NPV,
"FPR": FPR,
"FNR": FNR,
"FDR": FDR,
"FOR": FOR,
"F1": F1,
# "LR+": LRp,
# "LR-": LRn,
}
# self.metrics_evaluation[metric] = evaluation
return evaluation
def check_experiment_run(self, metric=None):
caller = sys._getframe().f_back.f_code.co_name
if not self.experiment_sucess:
raise NotImplementedError(
f"`{caller}` function can only be run after running "
"`run_experiment`."
)
if metric is not None and metric not in self.metrics:
raise ValueError(
f"`{caller}` function was can only be called with `metric` from "
f"{self.metrics} which were used while running the experiment"
)
return True
def get_roc_auc(self) -> OrderedDict:
"""Find ROC AUC for all the metrics used.
Returns:
collections.OrderedDict: An OrderedDict with AUC for all metrics.
"""
self.check_experiment_run()
self.roc_auc = {}
for metric in self.metrics:
predicted = self.predicted_as_similarity(metric)
fpr, tpr, thresholds = roc_curve(
self.df["target"], predicted, drop_intermediate=False
)
self.roc_auc[metric] = auc(fpr, tpr)
self.roc_auc = OrderedDict(
sorted(self.roc_auc.items(), key=lambda x: x[1], reverse=True)
)
return self.roc_auc
def predicted_as_similarity(self, metric: str) -> pd.Series:
"""Convert distance metrics to a similarity measure.
Args:
metric: distance metric to convert to similarity. If a similarity metric is
passed, It gets returned unchanged.
Returns:
pd.Series: Converted distance to similarity.
"""
predicted = self.df[metric]
if metric in DISTANCE_TO_SIMILARITY:
predicted = (
self.cached_predicted_as_similarity[metric]
if metric in self.cached_predicted_as_similarity
else DISTANCE_TO_SIMILARITY.get(metric)(predicted)
)
self.cached_predicted_as_similarity[metric] = predicted
return predicted
def calculate_eer(self) -> OrderedDict:
"""
Calculates the Equal Error Rate (EER) for each metric.
Returns an ordered dictionary containing the EER value and threshold for each metric.
The metrics are sorted in ascending order based on the EER values.
Returns:
OrderedDict: A dictionary containing the EER value and threshold for each metric.
The metrics are sorted in ascending order based on the EER values.
Example: {'metric1': {'EER': 0.123, 'Threshold': 0.456},
...}
"""
self.check_experiment_run()
self.eer = {}
for metric in self.metrics:
predicted = self.predicted_as_similarity(metric)
actual = self.df["target"]
# Calculate False Positive Rate (FPR) and True Positive Rate (TPR)
fpr, tpr, thresholds = roc_curve(actual, predicted, pos_label=1, drop_intermediate=False)
fnr = 1 - tpr
eer_threshold = thresholds[np.nanargmin(np.absolute(fnr - fpr))]
# theoretically eer from fpr and eer from fnr should be identical but they can be slightly differ in reality
eer_1 = fpr[np.nanargmin(np.absolute((fnr - fpr)))]
eer_2 = fnr[np.nanargmin(np.absolute((fnr - fpr)))]
if metric in REVERSE_DISTANCE_TO_SIMILARITY:
eer_threshold = REVERSE_DISTANCE_TO_SIMILARITY.get(metric)(eer_threshold)
# return the mean of eer from fpr and from fnr
self.eer[metric] = {'EER': (eer_1 + eer_2) / 2, 'Threshold': eer_threshold}
self.eer = OrderedDict(
sorted(self.eer.items(), key=lambda x: x[1]['EER'], reverse=False)
)
return self.eer
calculate_eer(self)
¶
Calculates the Equal Error Rate (EER) for each metric.
Returns an ordered dictionary containing the EER value and threshold for each metric. The metrics are sorted in ascending order based on the EER values.
Returns:
Type | Description |
---|---|
OrderedDict |
A dictionary containing the EER value and threshold for each metric. The metrics are sorted in ascending order based on the EER values. Example: {'metric1': {'EER': 0.123, 'Threshold': 0.456}, ...} |
Source code in evalify/evalify.py
def calculate_eer(self) -> OrderedDict:
"""
Calculates the Equal Error Rate (EER) for each metric.
Returns an ordered dictionary containing the EER value and threshold for each metric.
The metrics are sorted in ascending order based on the EER values.
Returns:
OrderedDict: A dictionary containing the EER value and threshold for each metric.
The metrics are sorted in ascending order based on the EER values.
Example: {'metric1': {'EER': 0.123, 'Threshold': 0.456},
...}
"""
self.check_experiment_run()
self.eer = {}
for metric in self.metrics:
predicted = self.predicted_as_similarity(metric)
actual = self.df["target"]
# Calculate False Positive Rate (FPR) and True Positive Rate (TPR)
fpr, tpr, thresholds = roc_curve(actual, predicted, pos_label=1, drop_intermediate=False)
fnr = 1 - tpr
eer_threshold = thresholds[np.nanargmin(np.absolute(fnr - fpr))]
# theoretically eer from fpr and eer from fnr should be identical but they can be slightly differ in reality
eer_1 = fpr[np.nanargmin(np.absolute((fnr - fpr)))]
eer_2 = fnr[np.nanargmin(np.absolute((fnr - fpr)))]
if metric in REVERSE_DISTANCE_TO_SIMILARITY:
eer_threshold = REVERSE_DISTANCE_TO_SIMILARITY.get(metric)(eer_threshold)
# return the mean of eer from fpr and from fnr
self.eer[metric] = {'EER': (eer_1 + eer_2) / 2, 'Threshold': eer_threshold}
self.eer = OrderedDict(
sorted(self.eer.items(), key=lambda x: x[1]['EER'], reverse=False)
)
return self.eer
evaluate_at_threshold(self, threshold, metric)
¶
Evaluate performance at specific threshold
Parameters:
Name | Type | Description | Default |
---|---|---|---|
threshold |
float |
Cut-off threshold. |
required |
metric |
str |
Metric to use. |
required |
Returns:
Type | Description |
---|---|
dict |
A dict ontaining all evaluation metrics. |
Source code in evalify/evalify.py
def evaluate_at_threshold(self, threshold: float, metric: str):
"""Evaluate performance at specific threshold
Args:
threshold: Cut-off threshold.
metric: Metric to use.
Returns:
dict: A dict ontaining all evaluation metrics.
"""
self.metrics_evaluation = {}
self.check_experiment_run(metric)
for metric in self.metrics:
predicted = self.get_binary_prediction(metric, threshold)
cm = confusion_matrix(self.df["target"], predicted)
tn, fp, fn, tp = cm.ravel()
TPR = tp / (tp + fn) # recall / true positive rate
TNR = tn / (tn + fp) # true negative rate
PPV = tp / (tp + fp) # precision / positive predicted value
NPV = tn / (tn + fn) # negative predictive value
FPR = fp / (fp + tn) # false positive rate
FNR = 1 - TPR # false negative rate
FDR = 1 - PPV # false discovery rate
FOR = 1 - NPV # false omission rate
F1 = 2 * (PPV * TPR) / (PPV + TPR)
# LRp = TPR / FPR # positive likelihood ratio (LR+)
# LRn = FNR / TNR # negative likelihood ratio (LR+)
evaluation = {
"TPR": TPR,
"TNR": TNR,
"PPV": PPV,
"NPV": NPV,
"FPR": FPR,
"FNR": FNR,
"FDR": FDR,
"FOR": FOR,
"F1": F1,
# "LR+": LRp,
# "LR-": LRn,
}
# self.metrics_evaluation[metric] = evaluation
return evaluation
find_optimal_cutoff(self)
¶
Find the optimal cutoff point
Returns:
Type | Description |
---|---|
float |
Optimal cutoff value |
Source code in evalify/evalify.py
def find_optimal_cutoff(self):
"""Find the optimal cutoff point
Returns:
float: Optimal cutoff value
"""
self.check_experiment_run()
self.optimal_cutoff = {}
for metric in self.metrics:
fpr, tpr, threshold = roc_curve(self.df["target"], self.df[metric])
i = np.arange(len(tpr))
roc = pd.DataFrame(
{
"tf": pd.Series(tpr - (1 - fpr), index=i),
"threshold": pd.Series(threshold, index=i),
}
)
roc_t = roc.iloc[(roc.tf - 0).abs().argsort()[:1]]
self.optimal_cutoff[metric] = roc_t["threshold"].item()
return self.optimal_cutoff
find_threshold_at_fpr(self, fpr)
¶
Finds optimal threshold at a given FPR.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fpr |
float |
False positive rate to find best threshold for. |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary with keys as metrics and values as thresholds. |
Exceptions:
Type | Description |
---|---|
ValueError |
If |
Source code in evalify/evalify.py
def find_threshold_at_fpr(self, fpr: float):
"""Finds optimal threshold at a given FPR.
Args:
fpr: False positive rate to find best threshold for.
Returns:
dict: A dictionary with keys as metrics and values as thresholds.
Raises:
ValueError: If `fpr` is not between 0 and 1.
"""
self.check_experiment_run()
if not 0 <= fpr <= 1:
raise ValueError(
"`fpr` must be between 0 and 1. " f"Received wanted_fpr={fpr}"
)
threshold_at_fpr = {}
for metric in self.metrics:
predicted = self.predicted_as_similarity(metric)
FPR, TPR, thresholds = roc_curve(
self.df["target"], predicted, drop_intermediate=False
)
df_fpr_tpr = pd.DataFrame({"FPR": FPR, "TPR": TPR, "Threshold": thresholds})
ix_left = np.searchsorted(df_fpr_tpr["FPR"], fpr, side="left")
ix_right = np.searchsorted(df_fpr_tpr["FPR"], fpr, side="right")
if fpr == 0:
best = df_fpr_tpr.iloc[ix_right]
elif fpr == 1 or ix_left == ix_right:
best = df_fpr_tpr.iloc[ix_left]
else:
best = (
df_fpr_tpr.iloc[ix_left]
if abs(df_fpr_tpr.iloc[ix_left].FPR - fpr)
< abs(df_fpr_tpr.iloc[ix_right].FPR - fpr)
else df_fpr_tpr.iloc[ix_right]
)
best = best.to_dict()
if metric in REVERSE_DISTANCE_TO_SIMILARITY:
best["Threshold"] = REVERSE_DISTANCE_TO_SIMILARITY.get(metric)(
best["Threshold"]
)
threshold_at_fpr[metric] = best
return threshold_at_fpr
get_binary_prediction(self, metric, threshold)
¶
Find binary prediction from distance or similarity.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metric |
Metric name for the desired prediction. |
required | |
threshold |
Cut off threshold. |
required |
Returns:
Type | Description |
---|---|
pd.Series |
Binary predictions. |
Source code in evalify/evalify.py
def get_binary_prediction(self, metric, threshold):
"""Find binary prediction from distance or similarity.
Args:
metric: Metric name for the desired prediction.
threshold: Cut off threshold.
Returns:
pd.Series: Binary predictions.
"""
return (
self.df[metric].apply(lambda x: 1 if x < threshold else 0)
if metric in DISTANCE_TO_SIMILARITY
else self.df[metric].apply(lambda x: 1 if x > threshold else 0)
)
get_roc_auc(self)
¶
Find ROC AUC for all the metrics used.
Returns:
Type | Description |
---|---|
collections.OrderedDict |
An OrderedDict with AUC for all metrics. |
Source code in evalify/evalify.py
def get_roc_auc(self) -> OrderedDict:
"""Find ROC AUC for all the metrics used.
Returns:
collections.OrderedDict: An OrderedDict with AUC for all metrics.
"""
self.check_experiment_run()
self.roc_auc = {}
for metric in self.metrics:
predicted = self.predicted_as_similarity(metric)
fpr, tpr, thresholds = roc_curve(
self.df["target"], predicted, drop_intermediate=False
)
self.roc_auc[metric] = auc(fpr, tpr)
self.roc_auc = OrderedDict(
sorted(self.roc_auc.items(), key=lambda x: x[1], reverse=True)
)
return self.roc_auc
predicted_as_similarity(self, metric)
¶
Convert distance metrics to a similarity measure.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metric |
str |
distance metric to convert to similarity. If a similarity metric is passed, It gets returned unchanged. |
required |
Returns:
Type | Description |
---|---|
pd.Series |
Converted distance to similarity. |
Source code in evalify/evalify.py
def predicted_as_similarity(self, metric: str) -> pd.Series:
"""Convert distance metrics to a similarity measure.
Args:
metric: distance metric to convert to similarity. If a similarity metric is
passed, It gets returned unchanged.
Returns:
pd.Series: Converted distance to similarity.
"""
predicted = self.df[metric]
if metric in DISTANCE_TO_SIMILARITY:
predicted = (
self.cached_predicted_as_similarity[metric]
if metric in self.cached_predicted_as_similarity
else DISTANCE_TO_SIMILARITY.get(metric)(predicted)
)
self.cached_predicted_as_similarity[metric] = predicted
return predicted
run(self, X, y, metrics='cosine_similarity', same_class_samples='full', different_class_samples='minimal', batch_size='best', shuffle=False, seed=None, return_embeddings=False, p=3)
¶
Runs an experiment for face verification
Parameters:
Name | Type | Description | Default |
---|---|---|---|
X |
ndarray |
Embeddings array |
required |
y |
ndarray |
Targets for X as integers |
required |
metrics |
Union[str, Sequence[str]] |
|
'cosine_similarity' |
same_class_samples |
Union[str, int] |
|
'full' |
different_class_samples |
Union[str, int, Sequence[Union[str, int]]] |
|
'minimal' |
batch_size |
Union[str, int] |
|
'best' |
shuffle |
bool |
Whether to shuffle the returned experiment dataframe. Default: False. |
False |
return_embeddings |
bool |
Whether to return the embeddings instead of indexes. Default: False |
False |
p |
int |
The order of the norm of the difference. Should be |
3 |
Returns:
Type | Description |
---|---|
pandas.DataFrame |
A DataFrame representing the experiment results. |
Exceptions:
Type | Description |
---|---|
ValueError |
An error occurred with the provided arguments. |
Notes
same_class_samples
:
If the provided number is greater than the achievable for the class,
the maximum possible combinations are used.
different_class_samples
:
If the provided number is greater than the achievable for the class,
the maximum possible combinations are used. (N, M) can also be
('full', 'full') but this will calculate all possible combinations
between all posibile negative samples. If the dataset is not small
this will probably result in an extremely large array!.
Source code in evalify/evalify.py
def run(
self,
X: np.ndarray,
y: np.ndarray,
metrics: Union[str, Sequence[str]] = "cosine_similarity",
same_class_samples: T_str_int = "full",
different_class_samples: Union[str, int, Sequence[T_str_int]] = "minimal",
batch_size: Union[T_str_int, None] = "best",
shuffle: bool = False,
seed: Union[int, None] = None,
return_embeddings: bool = False,
p: int = 3,
) -> pd.DataFrame:
"""Runs an experiment for face verification
Args:
X: Embeddings array
y: Targets for X as integers
metrics:
- 'cosine_similarity'
- 'pearson_similarity'
- 'cosine_distance'
- 'euclidean_distance'
- 'euclidean_distance_l2'
- 'minkowski_distance'
- 'manhattan_distance'
- 'chebyshev_distance'
- list/tuple containing more than one of them.
same_class_samples:
- 'full': Samples all possible images within each class to create all
all possible positive pairs.
- int: Samples specific number of images for every class to create
nC2 pairs where n is passed integer.
different_class_samples:
- 'full': Samples one image from every class with all possible pairs
of different classes. This can grow exponentially as the number
of images increase. (N, M) = (1, "full")
- 'minimal': Samples one image from every class with one image of
all other classes. (N, M) = (1, 1). (Default)
- int: Samples one image from every class with provided number of
images of every other class.
- tuple or list: (N, M) Samples N images from every class with M images of
every other class.
batch_size:
- 'best': Let the program decide based on available memory such that every
batch will fit into the available memory. (Default)
- int: Manually decide the batch_size.
- None: No batching. All experiment and intermediate results must fit into
memory or a MemoryError will be raised.
shuffle: Whether to shuffle the returned experiment dataframe. Default: False.
return_embeddings: Whether to return the embeddings instead of indexes.
Default: False
p:
The order of the norm of the difference. Should be `0 < p < 1`, Only valid with minkowski_distance as a metric.
Default = 3
Returns:
pandas.DataFrame: A DataFrame representing the experiment results.
Raises:
ValueError: An error occurred with the provided arguments.
Notes:
`same_class_samples`:
If the provided number is greater than the achievable for the class,
the maximum possible combinations are used.
`different_class_samples`:
If the provided number is greater than the achievable for the class,
the maximum possible combinations are used. (N, M) can also be
('full', 'full') but this will calculate all possible combinations
between all posibile negative samples. If the dataset is not small
this will probably result in an extremely large array!.
"""
if isinstance(metrics, str):
metrics = (metrics,)
self._validate_args(
metrics, same_class_samples, different_class_samples, batch_size, p
)
X, y = _validate_vectors(X, y)
all_targets = np.unique(y)
all_pairs = []
metric_fns = list(map(metrics_caller.get, metrics))
self.seed = seed
self.rng = np.random.default_rng(self.seed)
for target in all_targets:
all_pairs += self._get_pairs(
y,
same_class_samples,
different_class_samples,
target,
)
self.df = pd.DataFrame(
data=all_pairs, columns=["img_a", "img_b", "target_a", "target_b", "target"]
)
experiment_size = len(self.df)
if shuffle:
self.df = self.df.sample(frac=1, random_state=seed)
if batch_size == "best":
batch_size = calculate_best_batch_size(X)
elif batch_size is None:
batch_size = experiment_size
kwargs = {}
if any(metric in METRICS_NEED_NORM for metric in metrics):
kwargs["norms"] = get_norms(X)
if any(metric in METRICS_NEED_ORDER for metric in metrics):
kwargs["p"] = p
img_a = self.df.img_a.to_numpy()
img_b = self.df.img_b.to_numpy()
img_a_s = np.array_split(img_a, np.ceil(experiment_size / batch_size))
img_b_s = np.array_split(img_b, np.ceil(experiment_size / batch_size))
for metric, metric_fn in zip(metrics, metric_fns):
self.df[metric] = np.hstack(
[metric_fn(X, i, j, **kwargs) for i, j in zip(img_a_s, img_b_s)]
)
if return_embeddings:
self.df["img_a"] = X[img_a].tolist()
self.df["img_b"] = X[img_b].tolist()
self.experiment_sucess = True
self.metrics = metrics
return self.df
metrics
¶
Evalify metrics module used for calculating the evaluation metrics.
Optimized calculations using einstein sum. Embeddings array and norm arrays are indexed with every split and calculations happens over large data chunks very quickly.
utils
¶
Evalify utils module contains various utilites serving other modules.
calculate_best_batch_size(X, available_mem=None)
¶
Calculate maximum rows to fetch per batch without going out of memory.
We need 3 big arrays to be held in memory (A, B, A*B)
Source code in evalify/utils.py
def calculate_best_batch_size(X, available_mem=None):
"""Calculate maximum rows to fetch per batch without going out of memory.
We need 3 big arrays to be held in memory (A, B, A*B)
"""
available_mem = _calc_available_memory() if available_mem is None else available_mem
if available_mem > 2 * GB_TO_BYTE:
max_total_rows = np.floor(available_mem - GB_TO_BYTE / X[0].nbytes)
return max_total_rows // 3
else:
max_total_rows = np.floor(available_mem / X[0].nbytes)
return max_total_rows // 5