1

Ajoute la richesse chromatique par set

This commit is contained in:
2025-12-02 16:59:59 +01:00
parent 231a9af28d
commit c2bf12e3fe
12 changed files with 592 additions and 11 deletions

View File

@@ -0,0 +1,150 @@
"""Métriques de richesse chromatique par set."""
import csv
from pathlib import Path
from typing import Dict, Iterable, List, Sequence
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.stats import compute_median, read_rows
def load_colors_by_set(path: Path) -> List[dict]:
"""Charge colors_by_set.csv en mémoire."""
return read_rows(path)
def load_sets(path: Path) -> Dict[str, dict]:
"""Indexe les sets enrichis par set_num."""
sets: Dict[str, dict] = {}
with path.open() as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
sets[row["set_num"]] = row
return sets
def group_by_set(rows: Iterable[dict]) -> Dict[str, List[dict]]:
"""Regroupe les couleurs par set."""
grouped: Dict[str, List[dict]] = {}
for row in rows:
set_rows = grouped.get(row["set_num"])
if set_rows is None:
set_rows = []
grouped[row["set_num"]] = set_rows
set_rows.append(row)
return grouped
def build_richness_by_set(
colors_by_set_path: Path,
sets_enriched_path: Path,
) -> List[dict]:
"""Construit les métriques de richesse chromatique par set."""
colors = load_colors_by_set(colors_by_set_path)
sets_lookup = load_sets(sets_enriched_path)
grouped = group_by_set(colors)
richness: List[dict] = []
for set_num, set_rows in grouped.items():
total_non_spare = sum(int(row["quantity_non_spare"]) for row in set_rows)
colors_distinct = len(set_rows)
colors_minifig = sum(1 for row in set_rows if int(row["quantity_minifig"]) > 0)
colors_non_minifig = sum(1 for row in set_rows if int(row["quantity_non_minifig"]) > 0)
sorted_by_quantity = sorted(set_rows, key=lambda row: int(row["quantity_non_spare"]), reverse=True)
top_color = sorted_by_quantity[0]
top3_total = sum(int(row["quantity_non_spare"]) for row in sorted_by_quantity[:3])
top_share = int(top_color["quantity_non_spare"]) / total_non_spare
top3_share = top3_total / total_non_spare
set_row = sets_lookup[set_num]
richness.append(
{
"set_num": set_num,
"set_id": set_row["set_id"],
"name": set_row["name"],
"year": set_row["year"],
"in_collection": set_row["in_collection"],
"colors_distinct": str(colors_distinct),
"colors_minifig": str(colors_minifig),
"colors_non_minifig": str(colors_non_minifig),
"total_parts_non_spare": str(total_non_spare),
"top_color_name": top_color["color_name"],
"top_color_share": f"{top_share:.4f}",
"top3_share": f"{top3_share:.4f}",
}
)
richness.sort(key=lambda row: (-int(row["colors_distinct"]), row["set_num"]))
return richness
def build_richness_by_year(richness_rows: Iterable[dict]) -> List[dict]:
"""Agrège les métriques de richesse par année."""
grouped: Dict[str, List[dict]] = {}
for row in richness_rows:
year_rows = grouped.get(row["year"])
if year_rows is None:
year_rows = []
grouped[row["year"]] = year_rows
year_rows.append(row)
yearly: List[dict] = []
for year, rows in grouped.items():
distinct_counts = [int(row["colors_distinct"]) for row in rows]
top3_shares = [float(row["top3_share"]) for row in rows]
average_distinct = sum(distinct_counts) / len(distinct_counts)
median_distinct = compute_median(distinct_counts)
average_top3 = sum(top3_shares) / len(top3_shares)
median_top3 = compute_median([int(share * 10000) for share in top3_shares]) / 10000
yearly.append(
{
"year": year,
"average_colors_distinct": f"{average_distinct:.2f}",
"median_colors_distinct": f"{median_distinct:.2f}",
"max_colors_distinct": str(max(distinct_counts)),
"min_colors_distinct": str(min(distinct_counts)),
"average_top3_share": f"{average_top3:.4f}",
"median_top3_share": f"{median_top3:.4f}",
}
)
yearly.sort(key=lambda row: int(row["year"]))
return yearly
def write_richness_by_set(destination_path: Path, rows: Sequence[dict]) -> None:
"""Écrit le CSV des métriques par set."""
ensure_parent_dir(destination_path)
fieldnames = [
"set_num",
"set_id",
"name",
"year",
"in_collection",
"colors_distinct",
"colors_minifig",
"colors_non_minifig",
"total_parts_non_spare",
"top_color_name",
"top_color_share",
"top3_share",
]
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)
def write_richness_by_year(destination_path: Path, rows: Sequence[dict]) -> None:
"""Écrit le CSV agrégé par année."""
ensure_parent_dir(destination_path)
fieldnames = [
"year",
"average_colors_distinct",
"median_colors_distinct",
"max_colors_distinct",
"min_colors_distinct",
"average_top3_share",
"median_top3_share",
]
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)