1

Ajoute les agrégats et visualisations globales des couleurs de têtes

This commit is contained in:
2025-12-01 23:56:03 +01:00
parent ba76030d36
commit b271e0ebdd
12 changed files with 502 additions and 0 deletions

View File

@@ -0,0 +1,103 @@
"""Extraction des couleurs de têtes de minifigs sur le catalogue complet."""
import csv
from pathlib import Path
from typing import Dict, Iterable, List, Set, Tuple
from lib.rebrickable.parts_inventory import normalize_boolean, select_latest_inventories
HEAD_CATEGORIES = {"59"}
def load_head_parts(parts_path: Path, head_categories: Set[str] | None = None) -> Set[str]:
"""Construit l'ensemble des références de têtes via leur catégorie."""
categories = head_categories or HEAD_CATEGORIES
head_parts: Set[str] = set()
with parts_path.open() as parts_file:
reader = csv.DictReader(parts_file)
for row in reader:
if row["part_cat_id"] in categories:
head_parts.add(row["part_num"])
return head_parts
def build_sets_year_lookup(sets_path: Path) -> Dict[str, str]:
"""Indexe les années par set_num."""
lookup: Dict[str, str] = {}
with sets_path.open() as sets_file:
reader = csv.DictReader(sets_file)
for row in reader:
lookup[row["set_num"]] = row["year"]
return lookup
def build_color_lookup(colors_path: Path) -> Dict[str, dict]:
"""Construit un index des couleurs par identifiant."""
lookup: Dict[str, dict] = {}
with colors_path.open() as colors_file:
reader = csv.DictReader(colors_file)
for row in reader:
lookup[row["id"]] = {
"rgb": row["rgb"],
"is_translucent": row["is_trans"].lower(),
"name": row["name"],
}
return lookup
def aggregate_global_heads_by_year(
inventories_path: Path,
inventory_parts_path: Path,
parts_path: Path,
colors_path: Path,
sets_path: Path,
head_categories: Set[str] | None = None,
) -> List[dict]:
"""Agrège les couleurs de têtes par année sur le catalogue complet."""
head_parts = load_head_parts(parts_path, head_categories)
latest_inventories = select_latest_inventories(inventories_path)
latest_inventory_ids = {data["id"]: set_num for set_num, data in latest_inventories.items()}
colors_lookup = build_color_lookup(colors_path)
sets_year = build_sets_year_lookup(sets_path)
aggregates: Dict[Tuple[str, str, str], dict] = {}
with inventory_parts_path.open() as parts_file:
reader = csv.DictReader(parts_file)
for row in reader:
inventory_id = row["inventory_id"]
if inventory_id not in latest_inventory_ids:
continue
if row["part_num"] not in head_parts:
continue
if normalize_boolean(row["is_spare"]) == "true":
continue
set_num = latest_inventory_ids[inventory_id]
year = sets_year.get(set_num)
if year is None:
continue
color = colors_lookup[row["color_id"]]
key = (year, color["rgb"], color["is_translucent"])
existing = aggregates.get(key)
if existing is None:
aggregates[key] = {
"year": year,
"color_rgb": color["rgb"],
"is_translucent": color["is_translucent"],
"color_name": color["name"],
"quantity": 0,
}
existing = aggregates[key]
existing["quantity"] += int(row["quantity"])
results = list(aggregates.values())
results.sort(key=lambda r: (int(r["year"]), r["color_name"], r["is_translucent"]))
return results
def write_global_heads_by_year(destination_path: Path, rows: Iterable[dict]) -> None:
"""Sérialise l'agrégat global par année."""
fieldnames = ["year", "color_rgb", "is_translucent", "color_name", "quantity"]
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)