1

Normalise les minifigs anonymes en figurants

This commit is contained in:
2025-12-02 01:47:51 +01:00
parent 2cf7e063fe
commit 6186a5be4f
5 changed files with 298 additions and 10 deletions

View File

@@ -6,9 +6,61 @@ from typing import Dict, Iterable, List, Sequence, Set, Tuple
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.minifig_heads import HEAD_CATEGORIES
from lib.rebrickable.parts_inventory import (
index_inventory_minifigs_by_inventory,
index_inventory_parts_by_inventory,
normalize_boolean,
select_latest_inventories,
)
from lib.rebrickable.stats import read_rows
KNOWN_CHARACTERS = [
"Owen Grady",
"Claire Dearing",
"Alan Grant",
"Ellie Sattler",
"Ian Malcolm",
"John Hammond",
"Dennis Nedry",
"Ray Arnold",
"Robert Muldoon",
"Lex Murphy",
"Tim Murphy",
"Donald Gennaro",
"Dr Wu",
"Henry Wu",
"Vic Hoskins",
"Simon Masrani",
"Zia Rodriguez",
"Franklin Webb",
"Rainn DeLaCourt",
"Gunnar Eversol",
"Soyona Santos",
"Kayla Watts",
"Maisie Lockwood",
"Zach Mitchell",
"Gray Mitchell",
"Zach",
"Gray",
"Kenji",
"Darius",
"Yaz",
"Sammy",
"Brooklynn",
"Sinjin Prescott",
"Danny Nedermeyer",
"ACU Trooper",
"Hudson Harper",
"Isabella Delgado",
"Reuben Delgado",
"Allison Miles",
"Henry Loomis",
"Ben",
"Barry"
]
def load_parts_filtered(path: Path) -> List[dict]:
"""Charge parts_filtered.csv en mémoire."""
return read_rows(path)
@@ -29,10 +81,126 @@ def select_head_parts(catalog: Dict[str, dict]) -> Set[str]:
return {part_num for part_num, row in catalog.items() if row["part_cat_id"] in HEAD_CATEGORIES}
def load_minifig_catalog(path: Path) -> Dict[str, dict]:
"""Construit un index des minifigs par identifiant."""
catalog: Dict[str, dict] = {}
with path.open() as minifigs_file:
reader = csv.DictReader(minifigs_file)
for row in reader:
catalog[row["fig_num"]] = row
return catalog
def extract_character_name(part_name: str) -> str:
"""Extrait un nom probable de personnage depuis le nom de pièce."""
cleaned = part_name
prefix = "Minifig Head"
if cleaned.startswith(prefix):
cleaned = cleaned[len(prefix) :]
comma_index = cleaned.find(",")
if comma_index != -1:
cleaned = cleaned[:comma_index]
slash_index = cleaned.find("/")
if slash_index != -1:
cleaned = cleaned[:slash_index]
stripped = cleaned.strip()
if stripped == "":
return "Inconnu"
return stripped
def select_known_character(extracted_name: str) -> str:
"""Associe un personnage connu si le nom extrait correspond à la liste des jalons."""
lowered = extracted_name.lower()
for character in KNOWN_CHARACTERS:
if character.lower() == lowered:
return character
return ""
def load_aliases(path: Path) -> Dict[str, str]:
"""Charge les correspondances alias -> nom canonique."""
aliases: Dict[str, str] = {}
with path.open() as alias_file:
reader = csv.DictReader(alias_file)
for row in reader:
aliases[row["alias"].lower()] = row["canonical"]
return aliases
def normalize_known_character(raw_known: str, extracted_name: str, aliases: Dict[str, str]) -> str:
"""Nettoie et mappe un nom vers une version canonique."""
base = raw_known or extracted_name
if base == "Inconnu":
base = ""
base = base.strip()
if base == "":
return ""
if "," in base:
base = base.split(",", 1)[0]
if "/" in base:
base = base.split("/", 1)[0]
cleaned = base.strip()
lowered_cleaned = cleaned.lower()
for alias, canonical in aliases.items():
if lowered_cleaned == alias or lowered_cleaned.startswith(alias):
return canonical
for character in KNOWN_CHARACTERS:
lowered = character.lower()
if lowered_cleaned == lowered:
return character
if lowered_cleaned.startswith(f"{lowered} "):
return character
if lowered_cleaned.startswith(f"{lowered}'"):
return character
return cleaned
def build_set_minifigs_lookup(
inventories: Dict[str, dict],
inventory_minifigs_path: Path,
) -> Dict[str, List[str]]:
"""Associe les sets à leurs minifigs via l'inventaire."""
minifigs_by_inventory = index_inventory_minifigs_by_inventory(inventory_minifigs_path)
lookup: Dict[str, List[str]] = {}
for set_num, inventory in inventories.items():
lookup[set_num] = [row["fig_num"] for row in minifigs_by_inventory.get(inventory["id"], [])]
return lookup
def build_minifig_heads_lookup(
minifig_catalog: Dict[str, dict],
inventories: Dict[str, dict],
inventory_parts_path: Path,
head_parts: Set[str],
) -> Dict[str, Set[str]]:
"""Indexe les têtes présentes dans chaque minifig (hors rechanges)."""
parts_by_inventory = index_inventory_parts_by_inventory(inventory_parts_path)
heads_by_minifig: Dict[str, Set[str]] = {}
for fig_num in minifig_catalog:
inventory = inventories.get(fig_num)
if inventory is None:
continue
heads: Set[str] = set()
for part_row in parts_by_inventory.get(inventory["id"], []):
if part_row["part_num"] not in head_parts:
continue
if normalize_boolean(part_row["is_spare"]) == "true":
continue
heads.add(part_row["part_num"])
if heads:
heads_by_minifig[fig_num] = heads
return heads_by_minifig
def aggregate_heads_by_set(
parts_rows: Iterable[dict],
catalog: Dict[str, dict],
head_parts: Set[str],
set_minifigs: Dict[str, List[str]],
minifig_heads: Dict[str, Set[str]],
minifig_catalog: Dict[str, dict],
aliases: Dict[str, str],
) -> List[dict]:
"""Agrège les têtes de minifigs par set en éliminant les rechanges et doublons."""
seen: Set[Tuple[str, str]] = set()
@@ -46,11 +214,26 @@ def aggregate_heads_by_set(
if key in seen:
continue
part = catalog[row["part_num"]]
extracted = extract_character_name(part["name"])
possible_figs = [
fig_num for fig_num in set_minifigs.get(row["set_num"], []) if row["part_num"] in minifig_heads.get(fig_num, set())
]
known_character = ""
matched_fig = ""
if len(possible_figs) == 1:
matched_fig = possible_figs[0]
known_character = minifig_catalog.get(matched_fig, {}).get("name", "")
if known_character == "":
known_character = select_known_character(extracted)
normalized = normalize_known_character(known_character, extracted, aliases)
if matched_fig == "":
continue
heads.append(
{
"set_num": row["set_num"],
"part_num": row["part_num"],
"part_name": part["name"],
"known_character": normalized,
"fig_num": matched_fig,
}
)
seen.add(key)
@@ -61,7 +244,7 @@ def aggregate_heads_by_set(
def write_heads_by_set(destination_path: Path, rows: Sequence[dict]) -> None:
"""Écrit le CSV intermédiaire listant les têtes de minifigs par set."""
ensure_parent_dir(destination_path)
fieldnames = ["set_num", "part_num", "part_name"]
fieldnames = ["set_num", "part_num", "known_character", "fig_num"]
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
@@ -72,11 +255,29 @@ def write_heads_by_set(destination_path: Path, rows: Sequence[dict]) -> None:
def build_minifigs_by_set(
parts_filtered_path: Path,
parts_catalog_path: Path,
inventories_path: Path,
inventory_parts_path: Path,
inventory_minifigs_path: Path,
minifigs_path: Path,
aliases_path: Path,
destination_path: Path,
) -> None:
"""Construit le CSV listant les têtes de minifigs présentes par set."""
parts_rows = load_parts_filtered(parts_filtered_path)
parts_catalog = load_parts_catalog(parts_catalog_path)
head_parts = select_head_parts(parts_catalog)
heads = aggregate_heads_by_set(parts_rows, parts_catalog, head_parts)
latest_inventories = select_latest_inventories(inventories_path)
minifig_catalog = load_minifig_catalog(minifigs_path)
minifig_heads = build_minifig_heads_lookup(minifig_catalog, latest_inventories, inventory_parts_path, head_parts)
set_minifigs = build_set_minifigs_lookup(latest_inventories, inventory_minifigs_path)
aliases = load_aliases(aliases_path)
heads = aggregate_heads_by_set(
parts_rows,
parts_catalog,
head_parts,
set_minifigs,
minifig_heads,
minifig_catalog,
aliases,
)
write_heads_by_set(destination_path, heads)