"""Détection des têtes de minifigs à plusieurs visages et agrégats associés.""" import csv from pathlib import Path from typing import Dict, Iterable, List, Sequence from lib.filesystem import ensure_parent_dir from lib.rebrickable.stats import read_rows DUAL_FACE_KEYWORDS = [ "dual sided", "dual-sided", "double sided", "double-sided", "2 sided", "2-sided", "two sided", "two-sided", "dual print", "dual face", "double face", "two faces", "alt face", "alternate face", ] def load_parts_catalog(path: Path) -> Dict[str, dict]: """Indexe les pièces par référence.""" catalog: Dict[str, dict] = {} with path.open() as csv_file: reader = csv.DictReader(csv_file) for row in reader: catalog[row["part_num"]] = row return catalog def load_sets(path: Path) -> Dict[str, dict]: """Indexe les sets enrichis par set_num.""" sets: Dict[str, dict] = {} for row in read_rows(path): sets[row["set_num"]] = row return sets def detect_dual_face(name: str) -> str: """Détecte une tête dual-face via des mots-clés.""" lowered = name.lower() for keyword in DUAL_FACE_KEYWORDS: if keyword in lowered: return "true" return "false" def build_head_faces( minifigs_by_set_path: Path, parts_catalog_path: Path, sets_enriched_path: Path, ) -> List[dict]: """Construit la liste des têtes annotées selon la présence de visages multiples.""" heads = read_rows(minifigs_by_set_path) catalog = load_parts_catalog(parts_catalog_path) sets_lookup = load_sets(sets_enriched_path) annotated: List[dict] = [] for row in heads: part = catalog[row["part_num"]] set_row = sets_lookup[row["set_num"]] is_dual = detect_dual_face(part["name"]) annotated.append( { "set_num": row["set_num"], "set_id": set_row["set_id"], "year": set_row["year"], "name": set_row["name"], "in_collection": set_row["in_collection"], "part_num": row["part_num"], "part_name": part["name"], "fig_num": row["fig_num"], "known_character": row["known_character"], "gender": row["gender"], "is_dual_face": is_dual, } ) annotated.sort(key=lambda row: (row["set_num"], row["part_num"])) return annotated def aggregate_by_year(rows: Iterable[dict]) -> List[dict]: """Agrège les têtes dual-face par année.""" counts: Dict[str, dict] = {} for row in rows: year_entry = counts.get(row["year"]) if year_entry is None: year_entry = { "year": row["year"], "total_heads": 0, "dual_heads": 0, } counts[row["year"]] = year_entry year_entry["total_heads"] += 1 if row["is_dual_face"] == "true": year_entry["dual_heads"] += 1 aggregated: List[dict] = [] for year, entry in counts.items(): aggregated.append( { "year": year, "total_heads": str(entry["total_heads"]), "dual_heads": str(entry["dual_heads"]), "share_dual": f"{entry['dual_heads'] / entry['total_heads']:.4f}", } ) aggregated.sort(key=lambda row: int(row["year"])) return aggregated def aggregate_by_set(rows: Iterable[dict]) -> List[dict]: """Agrège les têtes dual-face par set.""" counts: Dict[str, dict] = {} for row in rows: entry = counts.get(row["set_num"]) if entry is None: entry = { "set_num": row["set_num"], "set_id": row["set_id"], "name": row["name"], "year": row["year"], "in_collection": row["in_collection"], "total_heads": 0, "dual_heads": 0, } counts[row["set_num"]] = entry entry["total_heads"] += 1 if row["is_dual_face"] == "true": entry["dual_heads"] += 1 aggregated: List[dict] = [] for entry in counts.values(): aggregated.append( { "set_num": entry["set_num"], "set_id": entry["set_id"], "name": entry["name"], "year": entry["year"], "in_collection": entry["in_collection"], "total_heads": str(entry["total_heads"]), "dual_heads": str(entry["dual_heads"]), "share_dual": f"{entry['dual_heads'] / entry['total_heads']:.4f}", } ) aggregated.sort(key=lambda row: (-int(row["dual_heads"]), -float(row["share_dual"]), row["set_num"])) return aggregated def aggregate_by_character(rows: Iterable[dict]) -> List[dict]: """Agrège les têtes dual-face par personnage connu.""" counts: Dict[str, dict] = {} for row in rows: character = row["known_character"] or "Inconnu" entry = counts.get(character) if entry is None: entry = { "known_character": character, "gender": row["gender"], "total_heads": 0, "dual_heads": 0, } counts[character] = entry entry["total_heads"] += 1 if row["is_dual_face"] == "true": entry["dual_heads"] += 1 aggregated: List[dict] = [] for character, entry in counts.items(): aggregated.append( { "known_character": character, "gender": entry["gender"], "total_heads": str(entry["total_heads"]), "dual_heads": str(entry["dual_heads"]), "share_dual": f"{entry['dual_heads'] / entry['total_heads']:.4f}", } ) aggregated.sort(key=lambda row: (-int(row["dual_heads"]), row["known_character"])) return aggregated def write_csv(destination_path: Path, rows: Sequence[dict], fieldnames: Sequence[str]) -> None: """Écrit un CSV générique.""" ensure_parent_dir(destination_path) with destination_path.open("w", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow(row)