"""Extraction des couleurs de têtes de minifigs sur le catalogue complet.""" import csv from pathlib import Path from typing import Dict, Iterable, List, Set, Tuple from lib.rebrickable.parts_inventory import normalize_boolean, select_latest_inventories HEAD_CATEGORIES = {"59"} def load_head_parts(parts_path: Path, head_categories: Set[str] | None = None) -> Set[str]: """Construit l'ensemble des références de têtes via leur catégorie.""" categories = head_categories or HEAD_CATEGORIES head_parts: Set[str] = set() with parts_path.open() as parts_file: reader = csv.DictReader(parts_file) for row in reader: if row["part_cat_id"] in categories: head_parts.add(row["part_num"]) return head_parts def build_sets_year_lookup(sets_path: Path) -> Dict[str, str]: """Indexe les années par set_num.""" lookup: Dict[str, str] = {} with sets_path.open() as sets_file: reader = csv.DictReader(sets_file) for row in reader: lookup[row["set_num"]] = row["year"] return lookup def build_color_lookup(colors_path: Path) -> Dict[str, dict]: """Construit un index des couleurs par identifiant.""" lookup: Dict[str, dict] = {} with colors_path.open() as colors_file: reader = csv.DictReader(colors_file) for row in reader: lookup[row["id"]] = { "rgb": row["rgb"], "is_translucent": row["is_trans"].lower(), "name": row["name"], } return lookup def aggregate_global_heads_by_year( inventories_path: Path, inventory_parts_path: Path, parts_path: Path, colors_path: Path, sets_path: Path, head_categories: Set[str] | None = None, ) -> List[dict]: """Agrège les couleurs de têtes par année sur le catalogue complet.""" head_parts = load_head_parts(parts_path, head_categories) latest_inventories = select_latest_inventories(inventories_path) latest_inventory_ids = {data["id"]: set_num for set_num, data in latest_inventories.items()} colors_lookup = build_color_lookup(colors_path) sets_year = build_sets_year_lookup(sets_path) aggregates: Dict[Tuple[str, str, str], dict] = {} with inventory_parts_path.open() as parts_file: reader = csv.DictReader(parts_file) for row in reader: inventory_id = row["inventory_id"] if inventory_id not in latest_inventory_ids: continue if row["part_num"] not in head_parts: continue if normalize_boolean(row["is_spare"]) == "true": continue set_num = latest_inventory_ids[inventory_id] year = sets_year.get(set_num) if year is None: continue color = colors_lookup[row["color_id"]] key = (year, color["rgb"], color["is_translucent"]) existing = aggregates.get(key) if existing is None: aggregates[key] = { "year": year, "color_rgb": color["rgb"], "is_translucent": color["is_translucent"], "color_name": color["name"], "quantity": 0, } existing = aggregates[key] existing["quantity"] += int(row["quantity"]) results = list(aggregates.values()) results.sort(key=lambda r: (int(r["year"]), r["color_name"], r["is_translucent"])) return results def write_global_heads_by_year(destination_path: Path, rows: Iterable[dict]) -> None: """Sérialise l'agrégat global par année.""" fieldnames = ["year", "color_rgb", "is_translucent", "color_name", "quantity"] with destination_path.open("w", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow(row)