1

118 lines
4.5 KiB
Python

"""Mesure la réutilisation des têtes de minifigs dans le catalogue LEGO."""
import csv
from collections import defaultdict
from pathlib import Path
from typing import Dict, Iterable, List, Sequence, Set
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.minifig_character_sets import load_sets
from lib.rebrickable.minifigs_by_set import load_parts_catalog, select_head_parts
from lib.rebrickable.parts_inventory import (
index_inventory_parts_by_inventory,
normalize_boolean,
select_latest_inventories,
)
from lib.rebrickable.stats import read_rows
def load_minifigs_by_set(path: Path) -> List[dict]:
"""Charge le CSV minifigs_by_set."""
return read_rows(path)
def build_head_presence(
inventories_path: Path,
inventory_parts_path: Path,
head_parts: Set[str],
) -> Dict[str, Set[str]]:
"""Indexe les sets contenant chaque tête (rechanges exclues)."""
inventories = select_latest_inventories(inventories_path)
parts_by_inventory = index_inventory_parts_by_inventory(inventory_parts_path)
presence: Dict[str, Set[str]] = {}
for set_num, inventory in inventories.items():
if set_num.startswith("fig-"):
continue
parts = parts_by_inventory.get(inventory["id"], [])
for part_row in parts:
if part_row["part_num"] not in head_parts:
continue
if normalize_boolean(part_row["is_spare"]) == "true":
continue
existing = presence.get(part_row["part_num"])
if existing is None:
existing = set()
presence[part_row["part_num"]] = existing
existing.add(set_num)
return presence
def build_filtered_presence(minifigs_rows: Iterable[dict]) -> Dict[str, Set[str]]:
"""Indexe les sets filtrés contenant chaque tête (hors figurants)."""
presence: Dict[str, Set[str]] = {}
for row in minifigs_rows:
if row["known_character"] == "Figurant":
continue
bucket = presence.get(row["part_num"])
if bucket is None:
bucket = set()
presence[row["part_num"]] = bucket
bucket.add(row["set_num"])
return presence
def build_character_labels(minifigs_rows: Iterable[dict]) -> Dict[str, str]:
"""Associe à chaque tête un personnage représentatif (hors figurants)."""
labels: Dict[str, Set[str]] = defaultdict(set)
for row in minifigs_rows:
character = row["known_character"]
if character == "Figurant":
continue
labels[row["part_num"]].add(character)
representative: Dict[str, str] = {}
for part_num, characters in labels.items():
representative[part_num] = sorted(characters)[0]
return representative
def aggregate_head_reuse(
minifigs_rows: Iterable[dict],
parts_catalog: Dict[str, dict],
head_presence: Dict[str, Set[str]],
sets_lookup: Dict[str, dict],
) -> List[dict]:
"""Construit le tableau des têtes présentes dans les sets filtrés avec leur réutilisation globale."""
filtered_presence = build_filtered_presence(minifigs_rows)
labels = build_character_labels(minifigs_rows)
aggregates: List[dict] = []
for part_num, filtered_sets in filtered_presence.items():
all_sets = set(head_presence.get(part_num, set()))
all_sets.update(filtered_sets)
other_sets = all_sets - filtered_sets
sample_set = sorted(filtered_sets)[0]
sample_set_id = sets_lookup.get(sample_set, {}).get("set_id", sample_set.split("-")[0])
aggregates.append(
{
"part_num": part_num,
"part_name": parts_catalog[part_num]["name"],
"known_character": labels.get(part_num, ""),
"sample_set_id": sample_set_id,
"filtered_sets": str(len(filtered_sets)),
"other_sets": str(len(other_sets)),
"total_sets": str(len(all_sets)),
}
)
aggregates.sort(key=lambda row: (int(row["other_sets"]), -int(row["filtered_sets"]), row["part_num"]))
return aggregates
def write_head_reuse(destination_path: Path, rows: Sequence[dict]) -> None:
"""Écrit le CSV des usages de têtes filtrées vs reste du catalogue."""
ensure_parent_dir(destination_path)
fieldnames = ["part_num", "part_name", "known_character", "sample_set_id", "filtered_sets", "other_sets", "total_sets"]
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)