"""Extraction des couleurs de têtes de minifigs.""" import csv from pathlib import Path from typing import Dict, Iterable, List, Set, Tuple from lib.rebrickable.colors_by_set import build_colors_lookup from lib.rebrickable.stats import read_rows HEAD_CATEGORIES = {"59"} def load_parts_filtered(path: Path) -> List[dict]: """Charge parts_filtered.csv en mémoire.""" return read_rows(path) def build_head_part_set(parts_catalog_path: Path) -> Set[str]: """Sélectionne les références de têtes via leur catégorie.""" head_parts: Set[str] = set() with parts_catalog_path.open() as parts_file: reader = csv.DictReader(parts_file) for row in reader: if row["part_cat_id"] in HEAD_CATEGORIES: head_parts.add(row["part_num"]) return head_parts def aggregate_head_colors_by_set( parts_rows: Iterable[dict], head_parts: Set[str], colors_lookup: Dict[Tuple[str, str], str], ) -> List[dict]: """Agrège les quantités de têtes par set et par couleur (hors rechanges).""" aggregates: Dict[Tuple[str, str, str, str], dict] = {} for row in parts_rows: if row["part_num"] not in head_parts: continue if row["is_spare"] == "true": continue key = (row["set_num"], row["set_id"], row["year"], row["color_rgb"]) existing = aggregates.get(key) if existing is None: aggregates[key] = { "set_num": row["set_num"], "set_id": row["set_id"], "year": row["year"], "color_rgb": row["color_rgb"], "is_translucent": row["is_translucent"], "color_name": colors_lookup[(row["color_rgb"], row["is_translucent"])], "quantity": 0, } existing = aggregates[key] existing["quantity"] += int(row["quantity_in_set"]) results = list(aggregates.values()) results.sort(key=lambda r: (r["set_num"], r["color_name"], r["is_translucent"])) return results def aggregate_head_colors_by_year(rows: Iterable[dict]) -> List[dict]: """Regroupe les têtes par année et par couleur.""" aggregates: Dict[Tuple[str, str, str], dict] = {} for row in rows: key = (row["year"], row["color_rgb"], row["is_translucent"]) existing = aggregates.get(key) if existing is None: aggregates[key] = { "year": row["year"], "color_rgb": row["color_rgb"], "is_translucent": row["is_translucent"], "color_name": row["color_name"], "quantity": 0, } existing = aggregates[key] existing["quantity"] += int(row["quantity"]) results = list(aggregates.values()) results.sort(key=lambda r: (int(r["year"]), r["color_name"], r["is_translucent"])) return results def write_head_colors_by_set(path: Path, rows: Iterable[dict]) -> None: """Écrit l'agrégat par set.""" fieldnames = [ "set_num", "set_id", "year", "color_rgb", "is_translucent", "color_name", "quantity", ] with path.open("w", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow(row) def write_head_colors_by_year(path: Path, rows: Iterable[dict]) -> None: """Écrit l'agrégat par année.""" fieldnames = [ "year", "color_rgb", "is_translucent", "color_name", "quantity", ] with path.open("w", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow(row)