"""Calculs de statistiques simples sur les pièces filtrées.""" import csv from collections import defaultdict from pathlib import Path from typing import Dict, Iterable, List, Sequence, Tuple from lib.filesystem import ensure_parent_dir from lib.rebrickable.inventory_reconciliation import compute_inventory_gaps from lib.rebrickable.stats import read_rows as read_stats_rows def read_rows(path: Path) -> List[dict]: """Charge un fichier CSV en mémoire sous forme de dictionnaires.""" with path.open() as csv_file: reader = csv.DictReader(csv_file) return list(reader) def select_non_spare_parts(rows: Iterable[dict]) -> List[dict]: """Filtre les pièces en excluant les rechanges.""" return [row for row in rows if row["is_spare"] == "false"] def variation_key(row: dict) -> Tuple[str, str, str]: """Clé d'unicité pour une variation de pièce (référence + couleur).""" return (row["part_num"], row["color_rgb"], row["is_translucent"]) def color_key(row: dict) -> Tuple[str, str]: """Clé d'unicité pour une couleur.""" return (row["color_rgb"], row["is_translucent"]) def aggregate_quantities_by_variation(rows: Iterable[dict]) -> Dict[Tuple[str, str, str], int]: """Calcule la quantité totale par variation de pièce (hors rechanges).""" quantities: Dict[Tuple[str, str, str], int] = defaultdict(int) for row in rows: quantities[variation_key(row)] += int(row["quantity_in_set"]) return quantities def read_total_filtered_parts(stats_path: Path) -> int: """Lit le total de pièces attendu pour les thèmes filtrés depuis stats.csv.""" rows = read_stats_rows(stats_path) return int( next(row["valeur"] for row in rows if row["libelle"] == "Total de pièces pour les thèmes filtrés") ) def build_stats( rows: Iterable[dict], sets_path: Path, parts_path: Path, stats_path: Path, ) -> List[Tuple[str, str]]: """Construit les statistiques principales sur les pièces filtrées et les écarts d'inventaire.""" non_spares = select_non_spare_parts(rows) quantities = aggregate_quantities_by_variation(non_spares) total_variations = len(quantities) color_set = {color_key(row) for row in non_spares} least_used_key = min(quantities, key=quantities.get) most_used_key = max(quantities, key=quantities.get) least_used = quantities[least_used_key] most_used = quantities[most_used_key] total_non_spare = sum(quantities.values()) gaps = compute_inventory_gaps(sets_path, parts_path) gap_count = len(gaps) worst_gap = max(gaps, key=lambda gap: gap["delta"]) if gap_count > 0 else {"set_id": "none", "delta": 0} catalog_total_parts = read_total_filtered_parts(stats_path) catalog_inventory_delta = catalog_total_parts - total_non_spare return [ ("Total de variations de pièces (hors rechanges)", str(total_variations)), ( "Pièce la moins utilisée (référence + couleur)", f"{least_used_key[0]} / {least_used_key[1]} / {least_used_key[2]} ({least_used})", ), ( "Pièce la plus commune (référence + couleur)", f"{most_used_key[0]} / {most_used_key[1]} / {most_used_key[2]} ({most_used})", ), ("Total de couleurs utilisées (hors rechanges)", str(len(color_set))), ("Total de pièces hors rechanges", str(total_non_spare)), ( "Ecart total catalogue (stats) - inventaire (hors rechanges)", str(catalog_inventory_delta), ), ("Nombre de sets en écart inventaire/catalogue", str(gap_count)), ("Ecart maximal inventaire/catalogue", f"{worst_gap['set_id']} ({worst_gap['delta']})"), ] def write_parts_stats(destination_path: Path, stats: Sequence[Tuple[str, str]]) -> None: """Écrit les statistiques dans un CSV à deux colonnes.""" ensure_parent_dir(destination_path) with destination_path.open("w", newline="") as csv_file: writer = csv.writer(csv_file) writer.writerow(["libelle", "valeur"]) for label, value in stats: writer.writerow([label, value])