"""Mesure la réutilisation des têtes de minifigs dans le catalogue LEGO.""" import csv from collections import defaultdict from pathlib import Path from typing import Dict, Iterable, List, Sequence, Set from lib.filesystem import ensure_parent_dir from lib.rebrickable.minifig_character_sets import load_sets from lib.rebrickable.minifigs_by_set import load_parts_catalog, select_head_parts from lib.rebrickable.parts_inventory import ( index_inventory_parts_by_inventory, normalize_boolean, select_latest_inventories, ) from lib.rebrickable.stats import read_rows def load_minifigs_by_set(path: Path) -> List[dict]: """Charge le CSV minifigs_by_set.""" return read_rows(path) def build_head_presence( inventories_path: Path, inventory_parts_path: Path, head_parts: Set[str], ) -> Dict[str, Set[str]]: """Indexe les sets contenant chaque tête (rechanges exclues).""" inventories = select_latest_inventories(inventories_path) parts_by_inventory = index_inventory_parts_by_inventory(inventory_parts_path) presence: Dict[str, Set[str]] = {} for set_num, inventory in inventories.items(): parts = parts_by_inventory.get(inventory["id"], []) for part_row in parts: if part_row["part_num"] not in head_parts: continue if normalize_boolean(part_row["is_spare"]) == "true": continue existing = presence.get(part_row["part_num"]) if existing is None: existing = set() presence[part_row["part_num"]] = existing existing.add(set_num) return presence def build_filtered_presence(minifigs_rows: Iterable[dict]) -> Dict[str, Set[str]]: """Indexe les sets filtrés contenant chaque tête (hors figurants).""" presence: Dict[str, Set[str]] = {} for row in minifigs_rows: if row["known_character"] == "Figurant": continue bucket = presence.get(row["part_num"]) if bucket is None: bucket = set() presence[row["part_num"]] = bucket bucket.add(row["set_num"]) return presence def build_character_labels(minifigs_rows: Iterable[dict]) -> Dict[str, str]: """Associe à chaque tête un personnage représentatif (hors figurants).""" labels: Dict[str, Set[str]] = defaultdict(set) for row in minifigs_rows: character = row["known_character"] if character == "Figurant": continue labels[row["part_num"]].add(character) representative: Dict[str, str] = {} for part_num, characters in labels.items(): representative[part_num] = sorted(characters)[0] return representative def aggregate_head_reuse( minifigs_rows: Iterable[dict], parts_catalog: Dict[str, dict], head_presence: Dict[str, Set[str]], sets_lookup: Dict[str, dict], ) -> List[dict]: """Construit le tableau des têtes présentes dans les sets filtrés avec leur réutilisation globale.""" filtered_presence = build_filtered_presence(minifigs_rows) labels = build_character_labels(minifigs_rows) aggregates: List[dict] = [] for part_num, filtered_sets in filtered_presence.items(): all_sets = set(head_presence.get(part_num, set())) all_sets.update(filtered_sets) other_sets = all_sets - filtered_sets sample_set = sorted(filtered_sets)[0] sample_set_id = sets_lookup.get(sample_set, {}).get("set_id", sample_set.split("-")[0]) aggregates.append( { "part_num": part_num, "part_name": parts_catalog[part_num]["name"], "known_character": labels.get(part_num, ""), "sample_set_id": sample_set_id, "filtered_sets": str(len(filtered_sets)), "other_sets": str(len(other_sets)), "total_sets": str(len(all_sets)), } ) aggregates.sort(key=lambda row: (int(row["other_sets"]), -int(row["filtered_sets"]), row["part_num"])) return aggregates def write_head_reuse(destination_path: Path, rows: Sequence[dict]) -> None: """Écrit le CSV des usages de têtes filtrées vs reste du catalogue.""" ensure_parent_dir(destination_path) fieldnames = ["part_num", "part_name", "known_character", "sample_set_id", "filtered_sets", "other_sets", "total_sets"] with destination_path.open("w", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow(row)