1
etude_lego_jurassic_world/lib/rebrickable/minifig_head_faces.py

195 lines
6.3 KiB
Python

"""Détection des têtes de minifigs à plusieurs visages et agrégats associés."""
import csv
from pathlib import Path
from typing import Dict, Iterable, List, Sequence
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.stats import read_rows
DUAL_FACE_KEYWORDS = [
"dual sided",
"dual-sided",
"double sided",
"double-sided",
"2 sided",
"2-sided",
"two sided",
"two-sided",
"dual print",
"dual face",
"double face",
"two faces",
"alt face",
"alternate face",
]
def load_parts_catalog(path: Path) -> Dict[str, dict]:
"""Indexe les pièces par référence."""
catalog: Dict[str, dict] = {}
with path.open() as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
catalog[row["part_num"]] = row
return catalog
def load_sets(path: Path) -> Dict[str, dict]:
"""Indexe les sets enrichis par set_num."""
sets: Dict[str, dict] = {}
for row in read_rows(path):
sets[row["set_num"]] = row
return sets
def detect_dual_face(name: str) -> str:
"""Détecte une tête dual-face via des mots-clés."""
lowered = name.lower()
for keyword in DUAL_FACE_KEYWORDS:
if keyword in lowered:
return "true"
return "false"
def build_head_faces(
minifigs_by_set_path: Path,
parts_catalog_path: Path,
sets_enriched_path: Path,
) -> List[dict]:
"""Construit la liste des têtes annotées selon la présence de visages multiples."""
heads = read_rows(minifigs_by_set_path)
catalog = load_parts_catalog(parts_catalog_path)
sets_lookup = load_sets(sets_enriched_path)
annotated: List[dict] = []
for row in heads:
part = catalog[row["part_num"]]
set_row = sets_lookup[row["set_num"]]
is_dual = detect_dual_face(part["name"])
annotated.append(
{
"set_num": row["set_num"],
"set_id": set_row["set_id"],
"year": set_row["year"],
"name": set_row["name"],
"in_collection": set_row["in_collection"],
"part_num": row["part_num"],
"part_name": part["name"],
"fig_num": row["fig_num"],
"known_character": row["known_character"],
"gender": row["gender"],
"is_dual_face": is_dual,
}
)
annotated.sort(key=lambda row: (row["set_num"], row["part_num"]))
return annotated
def aggregate_by_year(rows: Iterable[dict]) -> List[dict]:
"""Agrège les têtes dual-face par année."""
counts: Dict[str, dict] = {}
for row in rows:
year_entry = counts.get(row["year"])
if year_entry is None:
year_entry = {
"year": row["year"],
"total_heads": 0,
"dual_heads": 0,
}
counts[row["year"]] = year_entry
year_entry["total_heads"] += 1
if row["is_dual_face"] == "true":
year_entry["dual_heads"] += 1
aggregated: List[dict] = []
for year, entry in counts.items():
aggregated.append(
{
"year": year,
"total_heads": str(entry["total_heads"]),
"dual_heads": str(entry["dual_heads"]),
"share_dual": f"{entry['dual_heads'] / entry['total_heads']:.4f}",
}
)
aggregated.sort(key=lambda row: int(row["year"]))
return aggregated
def aggregate_by_set(rows: Iterable[dict]) -> List[dict]:
"""Agrège les têtes dual-face par set."""
counts: Dict[str, dict] = {}
for row in rows:
entry = counts.get(row["set_num"])
if entry is None:
entry = {
"set_num": row["set_num"],
"set_id": row["set_id"],
"name": row["name"],
"year": row["year"],
"in_collection": row["in_collection"],
"total_heads": 0,
"dual_heads": 0,
}
counts[row["set_num"]] = entry
entry["total_heads"] += 1
if row["is_dual_face"] == "true":
entry["dual_heads"] += 1
aggregated: List[dict] = []
for entry in counts.values():
aggregated.append(
{
"set_num": entry["set_num"],
"set_id": entry["set_id"],
"name": entry["name"],
"year": entry["year"],
"in_collection": entry["in_collection"],
"total_heads": str(entry["total_heads"]),
"dual_heads": str(entry["dual_heads"]),
"share_dual": f"{entry['dual_heads'] / entry['total_heads']:.4f}",
}
)
aggregated.sort(key=lambda row: (-int(row["dual_heads"]), -float(row["share_dual"]), row["set_num"]))
return aggregated
def aggregate_by_character(rows: Iterable[dict]) -> List[dict]:
"""Agrège les têtes dual-face par personnage connu."""
counts: Dict[str, dict] = {}
for row in rows:
character = row["known_character"] or "Inconnu"
entry = counts.get(character)
if entry is None:
entry = {
"known_character": character,
"gender": row["gender"],
"total_heads": 0,
"dual_heads": 0,
}
counts[character] = entry
entry["total_heads"] += 1
if row["is_dual_face"] == "true":
entry["dual_heads"] += 1
aggregated: List[dict] = []
for character, entry in counts.items():
aggregated.append(
{
"known_character": character,
"gender": entry["gender"],
"total_heads": str(entry["total_heads"]),
"dual_heads": str(entry["dual_heads"]),
"share_dual": f"{entry['dual_heads'] / entry['total_heads']:.4f}",
}
)
aggregated.sort(key=lambda row: (-int(row["dual_heads"]), row["known_character"]))
return aggregated
def write_csv(destination_path: Path, rows: Sequence[dict], fieldnames: Sequence[str]) -> None:
"""Écrit un CSV générique."""
ensure_parent_dir(destination_path)
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)