From 3c3857947e1bef01b14b8b83a4bacbb48814b9f7 Mon Sep 17 00:00:00 2001 From: Edward Malov Date: Mon, 18 Dec 2023 08:19:28 +0000 Subject: [PATCH] Align spark and pandas coverage computation --- replay/metrics/coverage.py | 41 +++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/replay/metrics/coverage.py b/replay/metrics/coverage.py index 64e17df19..645fb00cb 100644 --- a/replay/metrics/coverage.py +++ b/replay/metrics/coverage.py @@ -16,7 +16,7 @@ class Coverage(Metric): * take ``K`` recommendations with the biggest ``score`` for each ``user_id`` * count the number of distinct ``item_id`` in these recommendations - * divide it by the number of distinct items in the whole dataset + * divide it by the number of distinct items in train dataset, provided to metric call >>> recommendations query_id item_id rating @@ -33,26 +33,23 @@ class Coverage(Metric): 10 3 4 1.0 11 3 9 0.5 12 3 2 0.1 - >>> groundtruth + >>> train query_id item_id 0 1 5 1 1 6 - 2 1 7 - 3 1 8 - 4 1 9 - 5 1 10 - 6 2 6 - 7 2 7 - 8 2 4 - 9 2 10 - 10 2 11 - 11 3 1 + 2 1 8 + 3 1 9 + 4 1 2 + 5 2 5 + 6 2 8 + 7 2 11 + 8 2 1 + 9 2 3 + 10 3 4 + 11 3 9 12 3 2 - 13 3 3 - 14 3 4 - 15 3 5 - >>> Coverage(2)(recommendations, groundtruth) - {'Coverage@2': 0.5454545454545454} + >>> Coverage(2)(recommendations, train) + {'Coverage@2': 0.5555555555555556} """ @@ -112,7 +109,15 @@ def _spark_compute( metrics = [] for k in self.topk: - res = recs.filter(sf.col("best_position") <= k).count() / item_count + res = ( + recs.filter(sf.col("best_position") <= k) + .select(self.item_column) + .distinct() + .join( + train.select(self.item_column).distinct(), on=self.item_column + ) + .count() / item_count + ) metrics.append(res) if self._allow_caching: