1

102 lines
4.1 KiB
Python

"""Calculs de statistiques simples sur les pièces filtrées."""
import csv
from collections import defaultdict
from pathlib import Path
from typing import Dict, Iterable, List, Sequence, Tuple
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.inventory_reconciliation import compute_inventory_gaps
from lib.rebrickable.stats import read_rows as read_stats_rows
def read_rows(path: Path) -> List[dict]:
"""Charge un fichier CSV en mémoire sous forme de dictionnaires."""
with path.open() as csv_file:
reader = csv.DictReader(csv_file)
return list(reader)
def select_non_spare_parts(rows: Iterable[dict]) -> List[dict]:
"""Filtre les pièces en excluant les rechanges."""
return [row for row in rows if row["is_spare"] == "false"]
def variation_key(row: dict) -> Tuple[str, str, str]:
"""Clé d'unicité pour une variation de pièce (référence + couleur)."""
return (row["part_num"], row["color_rgb"], row["is_translucent"])
def color_key(row: dict) -> Tuple[str, str]:
"""Clé d'unicité pour une couleur."""
return (row["color_rgb"], row["is_translucent"])
def aggregate_quantities_by_variation(rows: Iterable[dict]) -> Dict[Tuple[str, str, str], int]:
"""Calcule la quantité totale par variation de pièce (hors rechanges)."""
quantities: Dict[Tuple[str, str, str], int] = defaultdict(int)
for row in rows:
quantities[variation_key(row)] += int(row["quantity_in_set"])
return quantities
def read_total_filtered_parts(stats_path: Path) -> int:
"""Lit le total de pièces attendu pour les thèmes filtrés depuis stats.csv."""
rows = read_stats_rows(stats_path)
return int(
next(row["valeur"] for row in rows if row["libelle"] == "Total de pièces pour les thèmes filtrés")
)
def build_stats(
rows: Iterable[dict],
sets_path: Path,
parts_path: Path,
stats_path: Path,
) -> List[Tuple[str, str]]:
"""Construit les statistiques principales sur les pièces filtrées et les écarts d'inventaire."""
non_spares = select_non_spare_parts(rows)
quantities = aggregate_quantities_by_variation(non_spares)
total_variations = len(quantities)
color_set = {color_key(row) for row in non_spares}
least_used_key = min(quantities, key=quantities.get)
most_used_key = max(quantities, key=quantities.get)
least_used = quantities[least_used_key]
most_used = quantities[most_used_key]
total_non_spare = sum(quantities.values())
gaps = compute_inventory_gaps(sets_path, parts_path)
gap_count = len(gaps)
worst_gap = max(gaps, key=lambda gap: gap["delta"]) if gap_count > 0 else {"set_id": "none", "delta": 0}
catalog_total_parts = read_total_filtered_parts(stats_path)
catalog_inventory_delta = catalog_total_parts - total_non_spare
return [
("Total de variations de pièces (hors rechanges)", str(total_variations)),
(
"Pièce la moins utilisée (référence + couleur)",
f"{least_used_key[0]} / {least_used_key[1]} / {least_used_key[2]} ({least_used})",
),
(
"Pièce la plus commune (référence + couleur)",
f"{most_used_key[0]} / {most_used_key[1]} / {most_used_key[2]} ({most_used})",
),
("Total de couleurs utilisées (hors rechanges)", str(len(color_set))),
("Total de pièces hors rechanges", str(total_non_spare)),
(
"Ecart total catalogue (stats) - inventaire (hors rechanges)",
str(catalog_inventory_delta),
),
("Nombre de sets en écart inventaire/catalogue", str(gap_count)),
("Ecart maximal inventaire/catalogue", f"{worst_gap['set_id']} ({worst_gap['delta']})"),
]
def write_parts_stats(destination_path: Path, stats: Sequence[Tuple[str, str]]) -> None:
"""Écrit les statistiques dans un CSV à deux colonnes."""
ensure_parent_dir(destination_path)
with destination_path.open("w", newline="") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(["libelle", "valeur"])
for label, value in stats:
writer.writerow([label, value])