"""Extraction des têtes de minifigs présentes dans chaque set filtré.""" import csv from pathlib import Path from typing import Dict, Iterable, List, Sequence, Set, Tuple from lib.filesystem import ensure_parent_dir from lib.rebrickable.minifig_heads import HEAD_CATEGORIES from lib.rebrickable.parts_inventory import ( index_inventory_minifigs_by_inventory, index_inventory_parts_by_inventory, normalize_boolean, select_latest_inventories, ) from lib.rebrickable.stats import read_rows KNOWN_CHARACTERS = [ "Owen Grady", "Claire Dearing", "Alan Grant", "Ellie Sattler", "Ian Malcolm", "John Hammond", "Dennis Nedry", "Ray Arnold", "Robert Muldoon", "Lex Murphy", "Tim Murphy", "Donald Gennaro", "Dr Wu", "Henry Wu", "Vic Hoskins", "Simon Masrani", "Zia Rodriguez", "Franklin Webb", "Rainn DeLaCourt", "Gunnar Eversol", "Soyona Santos", "Kayla Watts", "Maisie Lockwood", "Zach Mitchell", "Gray Mitchell", "Zach", "Gray", "Kenji", "Darius", "Yaz", "Sammy", "Brooklynn", "Sinjin Prescott", "Danny Nedermeyer", "ACU Trooper", "Hudson Harper", "Isabella Delgado", "Reuben Delgado", "Allison Miles", "Henry Loomis", "Ben", "Barry" ] def load_parts_filtered(path: Path) -> List[dict]: """Charge parts_filtered.csv en mémoire.""" return read_rows(path) def load_parts_catalog(path: Path) -> Dict[str, dict]: """Construit un index des pièces avec leur catégorie et leur nom.""" catalog: Dict[str, dict] = {} with path.open() as catalog_file: reader = csv.DictReader(catalog_file) for row in reader: catalog[row["part_num"]] = row return catalog def select_head_parts(catalog: Dict[str, dict]) -> Set[str]: """Sélectionne les références de têtes via leur catégorie.""" return {part_num for part_num, row in catalog.items() if row["part_cat_id"] in HEAD_CATEGORIES} def load_minifig_catalog(path: Path) -> Dict[str, dict]: """Construit un index des minifigs par identifiant.""" catalog: Dict[str, dict] = {} with path.open() as minifigs_file: reader = csv.DictReader(minifigs_file) for row in reader: catalog[row["fig_num"]] = row return catalog def extract_character_name(part_name: str) -> str: """Extrait un nom probable de personnage depuis le nom de pièce.""" cleaned = part_name prefix = "Minifig Head" if cleaned.startswith(prefix): cleaned = cleaned[len(prefix) :] comma_index = cleaned.find(",") if comma_index != -1: cleaned = cleaned[:comma_index] slash_index = cleaned.find("/") if slash_index != -1: cleaned = cleaned[:slash_index] stripped = cleaned.strip() if stripped == "": return "Inconnu" return stripped def select_known_character(extracted_name: str) -> str: """Associe un personnage connu si le nom extrait correspond à la liste des jalons.""" lowered = extracted_name.lower() for character in KNOWN_CHARACTERS: if character.lower() == lowered: return character return "" def load_aliases(path: Path) -> Dict[str, str]: """Charge les correspondances alias -> nom canonique.""" aliases: Dict[str, str] = {} with path.open() as alias_file: reader = csv.DictReader(alias_file) for row in reader: aliases[row["alias"].lower()] = row["canonical"] return aliases def normalize_known_character(raw_known: str, extracted_name: str, aliases: Dict[str, str]) -> str: """Nettoie et mappe un nom vers une version canonique.""" base = raw_known or extracted_name if base == "Inconnu": base = "" base = base.strip() if base == "": return "" if "," in base: base = base.split(",", 1)[0] if "/" in base: base = base.split("/", 1)[0] cleaned = base.strip() lowered_cleaned = cleaned.lower() for alias, canonical in aliases.items(): if lowered_cleaned == alias or lowered_cleaned.startswith(alias): return canonical for character in KNOWN_CHARACTERS: lowered = character.lower() if lowered_cleaned == lowered: return character if lowered_cleaned.startswith(f"{lowered} "): return character if lowered_cleaned.startswith(f"{lowered}'"): return character return cleaned def build_set_minifigs_lookup( inventories: Dict[str, dict], inventory_minifigs_path: Path, ) -> Dict[str, List[str]]: """Associe les sets à leurs minifigs via l'inventaire.""" minifigs_by_inventory = index_inventory_minifigs_by_inventory(inventory_minifigs_path) lookup: Dict[str, List[str]] = {} for set_num, inventory in inventories.items(): lookup[set_num] = [row["fig_num"] for row in minifigs_by_inventory.get(inventory["id"], [])] return lookup def build_minifig_heads_lookup( minifig_catalog: Dict[str, dict], inventories: Dict[str, dict], inventory_parts_path: Path, head_parts: Set[str], ) -> Dict[str, Set[str]]: """Indexe les têtes présentes dans chaque minifig (hors rechanges).""" parts_by_inventory = index_inventory_parts_by_inventory(inventory_parts_path) heads_by_minifig: Dict[str, Set[str]] = {} for fig_num in minifig_catalog: inventory = inventories.get(fig_num) if inventory is None: continue heads: Set[str] = set() for part_row in parts_by_inventory.get(inventory["id"], []): if part_row["part_num"] not in head_parts: continue if normalize_boolean(part_row["is_spare"]) == "true": continue heads.add(part_row["part_num"]) if heads: heads_by_minifig[fig_num] = heads return heads_by_minifig def aggregate_heads_by_set( parts_rows: Iterable[dict], catalog: Dict[str, dict], head_parts: Set[str], set_minifigs: Dict[str, List[str]], minifig_heads: Dict[str, Set[str]], minifig_catalog: Dict[str, dict], aliases: Dict[str, str], ) -> List[dict]: """Agrège les têtes de minifigs par set en éliminant les rechanges et doublons.""" seen: Set[Tuple[str, str]] = set() heads: List[dict] = [] for row in parts_rows: if row["part_num"] not in head_parts: continue if row["is_spare"] == "true": continue key = (row["set_num"], row["part_num"]) if key in seen: continue part = catalog[row["part_num"]] extracted = extract_character_name(part["name"]) possible_figs = [ fig_num for fig_num in set_minifigs.get(row["set_num"], []) if row["part_num"] in minifig_heads.get(fig_num, set()) ] known_character = "" matched_fig = "" if len(possible_figs) == 1: matched_fig = possible_figs[0] known_character = minifig_catalog.get(matched_fig, {}).get("name", "") if known_character == "": known_character = select_known_character(extracted) normalized = normalize_known_character(known_character, extracted, aliases) if matched_fig == "": continue heads.append( { "set_num": row["set_num"], "part_num": row["part_num"], "known_character": normalized, "fig_num": matched_fig, } ) seen.add(key) heads.sort(key=lambda row: (row["set_num"], row["part_num"])) return heads def write_heads_by_set(destination_path: Path, rows: Sequence[dict]) -> None: """Écrit le CSV intermédiaire listant les têtes de minifigs par set.""" ensure_parent_dir(destination_path) fieldnames = ["set_num", "part_num", "known_character", "fig_num"] with destination_path.open("w", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow(row) def build_minifigs_by_set( parts_filtered_path: Path, parts_catalog_path: Path, inventories_path: Path, inventory_parts_path: Path, inventory_minifigs_path: Path, minifigs_path: Path, aliases_path: Path, destination_path: Path, ) -> None: """Construit le CSV listant les têtes de minifigs présentes par set.""" parts_rows = load_parts_filtered(parts_filtered_path) parts_catalog = load_parts_catalog(parts_catalog_path) head_parts = select_head_parts(parts_catalog) latest_inventories = select_latest_inventories(inventories_path) minifig_catalog = load_minifig_catalog(minifigs_path) minifig_heads = build_minifig_heads_lookup(minifig_catalog, latest_inventories, inventory_parts_path, head_parts) set_minifigs = build_set_minifigs_lookup(latest_inventories, inventory_minifigs_path) aliases = load_aliases(aliases_path) heads = aggregate_heads_by_set( parts_rows, parts_catalog, head_parts, set_minifigs, minifig_heads, minifig_catalog, aliases, ) write_heads_by_set(destination_path, heads)