commit 22b4dae0ba8afd5c172fffe2ce459218692774fc Author: Richard Dern Date: Mon Dec 1 21:57:05 2025 +0100 Premiers éléments de l'étude diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c900c2f --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.env +__pycache__ +.pytest_cache +data/ diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..f00c12b --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,58 @@ +# AGENTS.md + +## Présentation du projet + +Ce projet contient le code et les ressources nécessaires à une étude portant sur les sets LEGO Jurassic World que je possède (mais extensible à d'autres sets LEGO si on le souhaite). +Ce qui est produit ici servira à la publication d'un ou plusieurs articles sur mon blog. + +## Instructions générales + +### Interactions avec l'utilisateur + +- L'utilisateur préfère être vouvoyé +- Répondre à l'utilisateur en français, quelle que soit la langue utilisée ailleurs +- Si une demande prête à confusion, demander une clarification à l'utilisateur plutôt que d'essayer de deviner +- L'utilisateur préfère considérer l'agent comme une entité féminine +- L'agent est un expert du développement web et maitrise toute technologie afférente +- L'utilisateur préfère qu'on s'adresse à lui avec un ton bienveillant, en utilisant un vocabulaire soutenu, approprié au contexte +- L'autisme de l'utilisateur doit être pris en compte au cours des échanges +- L'utilisateur peut parfois perdre patience : la discussion doit être désamorcée rapidement par une solution que l'agent mettra en place, testera et modifiera jusqu'à ce qu'elle produise le résultat escompté + +### Git + +- Ne jamais faire de `git push` automatiquement, sauf si demandé explicitement +- Les commits doivent être atomiques +- Les commits sont écrits en français +- Aucun commit ne doit être effectué sans demande explicite de l'utilisateur + +### Code + +- Le code doit être architecturé, propre, clair, concis, minimaliste + - Préférer la création de plusieurs petits fichiers pour éviter le code monolithique + - Ne pas implémenter de fonctionnalités non demandées + - Ne pas implémenter de paramètres de ligne de commande non demandés + - Ne pas créer de paramètres non demandés + - Respecter les principes DRY, KISS et SOLID + - On préfèrera du code piloté par la configuration plutôt que par l'utilisateur + - Si une librairie existe pour accomplir une tâche donnée, exploiter cette librairie + - S'assurer de sa popularité et de son activité récente +- Le code doit être écrit en anglais mais documenté en français, lisible par un humain + - Documenter clairement toutes les fonctions et méthodes + - Choisir des noms de variables appropriées et compréhensibles, tout en restant courts +- La gestion des erreurs est considérée comme dangereuse + - On ne doit jamais utiliser de fallbacks + - On ne doit jamais utiliser de structures de type `try/catch` ou équivalentes + - On ne doit jamais tester l'existence ou la définition d'une valeur + - On ne doit jamais vérifier l'intégrité des données manipulées + - Ces mesures visent à prévenir l'obfuscation de problèmes plus profonds + +### Outils en console + +- S'assurer de l'harmonisation des entrées/sorties + - Toujours utiliser le français dans les interactions avec l'utilisateur + +### Python + +- Accéder à l'environnement virtuel via la commande `source .venv/bin/activate` (initialiser l'environnement si `.venv/` n'existe pas) +- Renseigner les dépendances dans le fichier `requirements.txt` +- Toujours accompagner les librairies et les scripts par des tests unitaires/fonctionnels diff --git a/README.md b/README.md new file mode 100644 index 0000000..fba71a5 --- /dev/null +++ b/README.md @@ -0,0 +1,157 @@ +# Étude de sets LEGO + +## Présentation du projet + +Ce projet vise à étudier statistiquement des sets [LEGO](https://www.lego.com/fr-fr). +Le projet est construit autour du thème [_Jurassic World_](https://www.lego.com/fr-fr/themes/jurassic-world), mais le paramétrage des scripts devrait permettre d'étudier les sets de n'importe quel thème LEGO. + +L'objectif est de mettre en lumière des éléments spécifiques, tels que des pièces rares, des couleurs originales, ou des _minifigs_ particulières. +Cette étude doit permettre de piquer la curiosité en explorant l'évolution de notre thème préféré à travers le temps. + +Cette étude a pour ambition de satisfaire ma curiosité, mon désir d'approfondir un sujet qui m'est cher, et mon besoin compulsif d'exhaustivité. +Découvrir qu'un nouveau set LEGO _Jurassic World_, posséder le set, le construire, puis l'exposer, jouer avec, recréer l'ambiance des œuvres originales, en imprégner mon bureau ; tout cela n'est qu'une partie de cet univers, qui recèle d'autres informations à découvrir et explorer. + +Enfin, sur un plan plus technique, je souhaite améliorer mes compétences en python sur des sujets concrets que je maitrise, en l'exploitant notamment avec des librairies destinées à la production de statistiques. + +## Organisation actuelle + +- Les fichiers téléchargés ou produits sont rangés dans `data/` : + - `data/raw/` contient les données brutes Rebrickable (fichiers compressés et décompressés). + - `data/intermediate/` regroupe les transformations intermédiaires (filtres, enrichissements, rapports). + - `data/final/` stocke les exports finaux prêts pour les statistiques et graphiques. +- Les scripts créent automatiquement les répertoires parents nécessaires pour leurs sorties. +- Les scripts d'orchestration se trouvent dans `scripts/`. +- Le code mutualisé est rangé dans `lib/`. + +## Ordre d'exécution + +### Étape 1 : récupérer les thèmes Rebrickable + +1. `source .venv/bin/activate` +2. `python -m scripts.download_themes` + +Le script télécharge le fichier compressé `themes.csv.gz` depuis Rebrickable vers `data/raw/`, le décompresse immédiatement en `themes.csv`, supprime l'archive `.gz`, et ne retélécharge pas le fichier si `themes.csv` a moins de 7 jours. + +### Étape 2 : définir les thèmes à étudier + +Renseigner dans `.env` la liste des identifiants de thèmes (séparés par des virgules). Pour l'univers _Jurassic Park / Jurassic World_, les identifiants relevés dans `data/raw/themes.csv` sont : + +- 274 (`Jurassic Park III`, parent `Studios`) +- 602 (`Jurassic World`, parent racine) +- 620 (`Jurassic World: Fallen Kingdom`, parent `Juniors`) + +L'identifiant 722 (`Jurassic World`, parent `Duplo`) est volontairement ignoré. + +### Étape 3 : récupérer les sets Rebrickable + +1. `source .venv/bin/activate` +2. `python -m scripts.download_sets` + +Le script télécharge le fichier compressé `sets.csv.gz` depuis Rebrickable vers `data/raw/`, le décompresse immédiatement en `sets.csv`, supprime l'archive `.gz`, et ne retélécharge pas le fichier si `sets.csv` a moins de 7 jours. + +### Étape 4 : filtrer les sets sur les thèmes ciblés + +1. `source .venv/bin/activate` +2. `python -m scripts.filter_sets` + +Le script lit `THEME_IDS` depuis `.env`, prend `data/raw/sets.csv` en entrée, applique les corrections déclarées dans `config/num_parts_overrides.csv`, et produit `data/intermediate/sets_filtered.csv` contenant uniquement les lignes dont le `theme_id` appartient aux thèmes sélectionnés et dont `num_parts` est strictement supérieur à 0. + +Corrections manuelles connues (`config/num_parts_overrides.csv`) : + +| set_num | num_parts | commentaire | +| -------- | --------- | ------------------------------------- | +| 122220-1 | 30 | Sachet promotionnel annoncé 30 pièces | + +### Étape 5 : enrichir les sets filtrés + +1. `source .venv/bin/activate` +2. `python -m scripts.enrich_sets` + +Le script lit `data/intermediate/sets_filtered.csv`, ajoute : + +- `set_id` (partie avant le tiret dans `set_num`) +- `rebrickable_url` (URL publique du set sur Rebrickable) +- `in_collection` (`true/false` selon la présence du set dans le dossier `MY_SETS`) + +La variable `MY_SETS` (définie dans `.env`) doit pointer vers un dossier contenant un sous-dossier par identifiant LEGO possédé. Si la variable est vide, que le dossier est absent ou vide, la colonne `in_collection` sera à `false` pour tous les sets. Les sorties sont `data/intermediate/sets_enriched.csv` et `data/final/sets_missing.md`. + +### Étape 6 : calculer des statistiques basiques + +1. `source .venv/bin/activate` +2. `python -m scripts.compute_stats` + +Le script lit `data/raw/themes.csv`, `data/raw/sets.csv`, `data/intermediate/sets_filtered.csv` et `data/intermediate/sets_enriched.csv`, puis écrit `data/final/stats.csv` avec deux colonnes (`libelle`, `valeur`) contenant notamment : + +- nombre total de sets (catalogue complet) +- nombre de sets filtrés et pourcentage vs total +- nombre moyen de sets par thème (catalogue complet) +- sets en collection / sets manquants +- taux de possession +- moyenne, médiane et total de pièces pour les thèmes filtrés +- moyenne de sets commercialisés par an +- bornes d'années et nombre de thèmes filtrés +- année la plus prolifique +- set le plus fourni / le moins fourni en pièces +- set le plus ancien / le plus récent +- moyenne de pièces des sets possédés / manquants +- total de pièces des sets possédés +- pourcentage de pièces possédées +- moyenne de sets par thème (catalogue complet, via `themes.csv`) +- nombre total de thèmes (catalogue complet) + +### Milestones (jalons chronologiques) + +Les jalons sont configurés dans `config/milestones.csv` (colonnes `year`, `description`). Ils sont indépendants des thèmes sélectionnés : pour un autre univers (ex. Star Wars), il suffit de remplacer ou adapter ce fichier sans modifier le code. + +### Étape 7 : graphique du nombre de sets par année + +1. `source .venv/bin/activate` +2. `python -m scripts.plot_sets_per_year` + +Le script lit `data/intermediate/sets_enriched.csv`, les jalons `config/milestones.csv`, et produit `figures/step07/sets_per_year.png` montrant : + +- le nombre de sets par année (barres) +- la moyenne cumulative des sets (courbe) +- le total de pièces par année (barres) +- la moyenne cumulative des pièces par set (courbe) annoté avec les jalons chronologiques + +En parallèle, le script `python -m scripts.plot_parts_per_set` génère `figures/step07/avg_parts_per_set.png` avec la moyenne annuelle de pièces par set et une moyenne glissante (3 ans) pour mettre en évidence la tendance sans diluer l'historique. + +### Étape 8 : télécharger les données détaillées des pièces + +1. `source .venv/bin/activate` +2. `python -m scripts.download_parts_data` + +Le script télécharge les fichiers compressés `inventories.csv.gz`, `inventory_parts.csv.gz`, `inventory_minifigs.csv.gz`, `minifigs.csv.gz`, `parts.csv.gz` et `colors.csv.gz` vers `data/raw/`, les décompresse immédiatement en supprimant chaque archive `.gz`, et ne retélécharge pas les fichiers âgés de moins de 7 jours (cache fondé sur les CSV décompressés). Ces données complètent les sets en décrivant leurs inventaires, les pièces individuelles, les minifigs associées et les couleurs disponibles. + +### Étape 9 : assembler l'inventaire des pièces par set + +1. `source .venv/bin/activate` +2. `python -m scripts.build_parts_inventory` + +Le script lit `data/intermediate/sets_enriched.csv`, `data/raw/inventories.csv`, `data/raw/inventory_parts.csv`, `data/raw/inventory_minifigs.csv`, `data/raw/minifigs.csv` et `data/raw/colors.csv`, sélectionne la version d'inventaire la plus récente pour chaque set, puis produit `data/intermediate/parts_filtered.csv` contenant : `part_num`, `color_rgb`, `is_translucent`, `set_num`, `set_id`, `quantity_in_set`, `is_spare`. Les minifigs sont éclatées en pièces en exploitant leur propre inventaire (présent dans `inventories.csv` + `inventory_parts.csv`) et leurs quantités dans `inventory_minifigs.csv`. Ce fichier sert de base aux analyses ultérieures sans relire les CSV bruts. + +### Étape 10 : identifier les écarts d'inventaire + +1. `source .venv/bin/activate` +2. `python -m scripts.report_inventory_gaps` + +Le script lit `data/intermediate/sets_enriched.csv` et `data/intermediate/parts_filtered.csv`, calcule pour chaque set filtré le total de pièces (rechanges incluses), et produit `data/intermediate/inventory_gaps.csv` avec les colonnes : + +- `set_num` +- `set_id` +- `expected_parts` (`num_parts` dans `sets_enriched.csv`) +- `inventory_parts` (somme de `quantity_in_set` dans `parts_filtered.csv`, rechanges incluses) +- `delta` (valeur absolue de `expected_parts - inventory_parts`) +- `in_collection` (valeur issue de `sets_enriched.csv`) + +Seuls les sets dont les totaux diffèrent figurent dans ce fichier. Aucune tentative de correction n'est effectuée : l'inventaire existant reste la référence malgré les éventuels manques du catalogue Rebrickable. + +Un tableau Markdown est également généré dans `data/final/inventory_gaps.md` listant ces sets avec leur nom, l'écart observé et un lien vers les instructions LEGO. + +### Étape 11 : statistiques simples sur les pièces + +1. `source .venv/bin/activate` +2. `python -m scripts.compute_parts_stats` + +Le script lit `data/intermediate/parts_filtered.csv` et `data/final/stats.csv` (pour le total catalogue filtré), puis produit `data/final/parts_stats.csv` avec : nombre de variations de pièces (hors rechanges), pièce la moins utilisée, pièce la plus commune, nombre de couleurs utilisées, total de pièces hors rechanges, écart entre le total de pièces attendu (stats catalogue) et l'inventaire agrégé, nombre de sets présentant un écart inventaire/catalogue et écart maximal observé. diff --git a/config/milestones.csv b/config/milestones.csv new file mode 100644 index 0000000..bfb8768 --- /dev/null +++ b/config/milestones.csv @@ -0,0 +1,15 @@ +year,description +1993,Jurassic Park +1997,The Lost World: Jurassic Park +2001,Jurassic Park III +2015,Jurassic World +2015,LEGO Jurassic World +2018,Jurassic World: Fallen Kingdom +2018,LEGO Jurassic World: The Secret Exhibit +2018,Jurassic World Evolution +2019,LEGO Jurassic World: Legend of Isla Nublar +2020,Jurassic World: Camp Cretaceous +2021,Jurassic World Evolution 2 +2022,Jurassic World Dominion +2024,Jurassic World: Chaos Theory +2025,Jurassic World Rebirth \ No newline at end of file diff --git a/config/num_parts_overrides.csv b/config/num_parts_overrides.csv new file mode 100644 index 0000000..bcb70fb --- /dev/null +++ b/config/num_parts_overrides.csv @@ -0,0 +1,2 @@ +set_num,num_parts +122220-1,30 diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..d8a4120 --- /dev/null +++ b/lib/__init__.py @@ -0,0 +1 @@ +"""Fonctions de support pour l'étude des sets LEGO.""" diff --git a/lib/color_sort.py b/lib/color_sort.py new file mode 100644 index 0000000..1dbf87c --- /dev/null +++ b/lib/color_sort.py @@ -0,0 +1,59 @@ +"""Outils de tri de couleurs dans un espace perceptuel.""" + +import math +from typing import Iterable, List, Tuple + +import numpy as np +from colorspacious import cspace_convert + + +def hex_to_rgb_unit(hex_value: str) -> np.ndarray: + """Convertit un code hexadécimal en tableau RGB normalisé (0-1).""" + return np.array([int(hex_value[index : index + 2], 16) / 255 for index in (0, 2, 4)], dtype=float) + + +def lab_components(hex_value: str) -> Tuple[float, float, float, float, float]: + """Retourne (hue_angle, chroma, lightness, a*, b*) pour une couleur.""" + l_component, a_component, b_component = cspace_convert(hex_to_rgb_unit(hex_value), "sRGB1", "CIELab") + hue_angle = math.atan2(b_component, a_component) + chroma = math.hypot(a_component, b_component) + return hue_angle, chroma, l_component, a_component, b_component + + +def sort_hex_colors_lab( + hex_values: Iterable[str], + hue_offset_degrees: float = 60.0, + neutral_threshold: float = 3.0, +) -> List[str]: + """ + Trie des couleurs par teinte perceptuelle, puis chroma et luminosité. + + - Les couleurs quasi neutres (chroma < seuil) sont déplacées en fin de liste, triées par luminosité. + - Le cercle chromatique peut être décalé via hue_offset_degrees (par défaut 60° pour démarrer vers le jaune). + """ + offset_radians = math.radians(hue_offset_degrees) + chromatic: List[Tuple[float, float, float, str]] = [] + neutrals: List[Tuple[float, str]] = [] + for hex_value in hex_values: + hue_angle, chroma, lightness, _, _ = lab_components(hex_value) + if chroma < neutral_threshold: + neutrals.append((lightness, hex_value)) + continue + hue = hue_angle + offset_radians + if hue < 0: + hue += 2 * math.pi + chromatic.append((hue, -chroma, lightness, hex_value)) + chromatic.sort() + neutrals.sort() + return [item[3] for item in chromatic] + [item[1] for item in neutrals] + + +def lab_sort_key(hex_value: str, hue_offset_degrees: float = 60.0, neutral_threshold: float = 3.0) -> Tuple[int, float, float, float]: + """Clé de tri unique (bucket chromatique/neutre) pour un usage ponctuel.""" + hue_angle, chroma, lightness, _, _ = lab_components(hex_value) + if chroma < neutral_threshold: + return (1, 0.0, lightness, chroma) + hue = hue_angle + math.radians(hue_offset_degrees) + if hue < 0: + hue += 2 * math.pi + return (0, hue, -chroma, lightness) diff --git a/lib/filesystem.py b/lib/filesystem.py new file mode 100644 index 0000000..ce399cc --- /dev/null +++ b/lib/filesystem.py @@ -0,0 +1,8 @@ +"""Fonctions utilitaires pour manipuler le système de fichiers.""" + +from pathlib import Path + + +def ensure_parent_dir(target_path: Path) -> None: + """Crée le répertoire parent d'un chemin de fichier s'il est absent.""" + target_path.parent.mkdir(parents=True, exist_ok=True) diff --git a/lib/milestones.py b/lib/milestones.py new file mode 100644 index 0000000..109dfc2 --- /dev/null +++ b/lib/milestones.py @@ -0,0 +1,15 @@ +"""Chargement des jalons (milestones) thématiques configurables.""" + +import csv +from pathlib import Path +from typing import List + + +def load_milestones(path: Path) -> List[dict]: + """Charge la liste des jalons depuis un fichier CSV à deux colonnes (year, description).""" + milestones = [] + with path.open() as csv_file: + reader = csv.DictReader(csv_file) + for row in reader: + milestones.append({"year": int(row["year"]), "description": row["description"]}) + return milestones diff --git a/lib/plots/__init__.py b/lib/plots/__init__.py new file mode 100644 index 0000000..5c14acc --- /dev/null +++ b/lib/plots/__init__.py @@ -0,0 +1 @@ +"""Utilitaires de visualisation des données LEGO.""" diff --git a/lib/plots/colors_grid.py b/lib/plots/colors_grid.py new file mode 100644 index 0000000..2f1d43e --- /dev/null +++ b/lib/plots/colors_grid.py @@ -0,0 +1,174 @@ +"""Visualisation des couleurs utilisées dans l'inventaire filtré.""" + +from pathlib import Path +from typing import Dict, Iterable, List, Tuple + +import matplotlib.pyplot as plt +import numpy as np +from matplotlib.lines import Line2D + +from lib.filesystem import ensure_parent_dir +from lib.color_sort import lab_sort_key, sort_hex_colors_lab +from lib.rebrickable.parts_inventory import normalize_boolean +from lib.rebrickable.stats import read_rows + + +def sort_colors_perceptually(colors: Iterable[dict]) -> List[dict]: + """Trie les couleurs via l'espace Lab (teinte perçue, chroma, luminosité).""" + ordered_hex = sort_hex_colors_lab(color["color_rgb"] for color in colors) + index_map = {hex_value: index for index, hex_value in enumerate(ordered_hex)} + return sorted(colors, key=lambda color: index_map[color["color_rgb"]]) + + +def load_used_colors(parts_path: Path, colors_path: Path, minifig_only: bool = False) -> List[dict]: + """Charge les couleurs utilisées (hors rechanges) et leurs quantités totales. + + Si minifig_only est vrai, ne conserve que les pièces marquées is_minifig_part=true. + Sinon, exclut les pièces de minifig. + """ + rows = read_rows(parts_path) + colors_lookup = {(row["rgb"], normalize_boolean(row["is_trans"])): row["name"] for row in read_rows(colors_path)} + totals: Dict[Tuple[str, str], int] = {} + for row in rows: + if minifig_only and row.get("is_minifig_part") != "true": + continue + if not minifig_only and row.get("is_minifig_part") == "true": + continue + key = (row["color_rgb"], row["is_translucent"]) + totals[key] = totals.get(key, 0) + int(row["quantity_in_set"]) + used_colors = [] + for (color_rgb, is_translucent), quantity in totals.items(): + used_colors.append( + { + "color_rgb": color_rgb, + "is_translucent": is_translucent, + "name": colors_lookup.get((color_rgb, is_translucent), color_rgb), + "quantity": quantity, + } + ) + return sort_colors_perceptually(used_colors) + + +def build_hex_positions(count: int, columns: int = 9, spacing: float = 1.1) -> List[Tuple[float, float]]: + """Construit des positions hexagonales pour une mise en page aérée.""" + positions: List[Tuple[float, float]] = [] + rows = (count + columns - 1) // columns + vertical_gap = spacing * 0.85 + for row in range(rows): + offset = 0.0 if row % 2 == 0 else spacing / 2 + for col in range(columns): + index = row * columns + col + if index >= count: + return positions + x = col * spacing + offset + y = -row * vertical_gap + positions.append((x, y)) + return positions + + +def build_background(width: float, height: float, resolution: int = 600) -> np.ndarray: + """Génère un fond dégradé pour mettre en valeur les couleurs translucides.""" + x = np.linspace(-1.0, 1.0, resolution) + y = np.linspace(-1.0, 1.0, resolution) + xv, yv = np.meshgrid(x, y) + radial = np.sqrt(xv**2 + yv**2) + diagonal = (xv + yv) / 2 + layer = 0.35 + 0.35 * (1 - radial) + 0.2 * diagonal + layer = np.clip(layer, 0.05, 0.95) + background = np.dstack((layer * 0.9, layer * 0.92, layer)) + return background + + +def plot_colors_grid( + parts_path: Path, + colors_path: Path, + destination_path: Path, + minifig_only: bool = False, +) -> None: + """Dessine une grille artistique des couleurs utilisées.""" + colors = load_used_colors(parts_path, colors_path, minifig_only=minifig_only) + positions = build_hex_positions(len(colors)) + x_values = [x for x, _ in positions] + y_values = [y for _, y in positions] + width = max(x_values) - min(x_values) + 1.5 + height = max(y_values) - min(y_values) + 1.5 + + fig, ax = plt.subplots(figsize=(10, 10), facecolor="#0b0c10") + background = build_background(width, height) + ax.imshow( + background, + extent=[min(x_values) - 0.75, min(x_values) - 0.75 + width, min(y_values) - 0.75, min(y_values) - 0.75 + height], + origin="lower", + zorder=0, + ) + + max_quantity = max(color["quantity"] for color in colors) + min_marker = 720 + max_marker = 1600 + + for (x, y), color in zip(positions, colors): + is_translucent = color["is_translucent"] == "true" + alpha = 0.65 if is_translucent else 1.0 + edge = "#f7f7f7" if is_translucent else "#0d0d0d" + size = min_marker + (max_marker - min_marker) * (color["quantity"] / max_quantity) + if is_translucent: + ax.scatter( + x, + y, + s=size * 1.25, + c="#ffffff", + alpha=0.18, + edgecolors="none", + linewidths=0, + zorder=2, + ) + ax.scatter( + x, + y, + s=size, + c=f"#{color['color_rgb']}", + alpha=alpha, + edgecolors=edge, + linewidths=1.1, + zorder=3, + ) + + legend_handles = [ + Line2D([0], [0], marker="o", color="none", markerfacecolor="#cccccc", markeredgecolor="#0d0d0d", markersize=10, label="Opaque"), + Line2D( + [0], + [0], + marker="o", + color="none", + markerfacecolor="#cccccc", + markeredgecolor="#f7f7f7", + markersize=10, + alpha=0.65, + label="Translucide", + ), + ] + legend_y = 1.06 if not minifig_only else 1.08 + ax.legend( + handles=legend_handles, + loc="upper center", + bbox_to_anchor=(0.5, legend_y), + ncol=2, + frameon=False, + labelcolor="#f0f0f0", + ) + + title_prefix = "Palette des couleurs utilisées (rechanges incluses)" + if minifig_only: + title_prefix = "Palette des couleurs de minifigs (rechanges incluses)" + ax.set_title(title_prefix, fontsize=14, color="#f0f0f0", pad=28) + ax.set_xticks([]) + ax.set_yticks([]) + ax.set_xlim(min(x_values) - 1.0, max(x_values) + 1.0) + ax.set_ylim(min(y_values) - 1.0, max(y_values) + 1.0) + for spine in ax.spines.values(): + spine.set_visible(False) + + ensure_parent_dir(destination_path) + fig.tight_layout() + fig.savefig(destination_path, dpi=200) + plt.close(fig) diff --git a/lib/plots/parts_per_set.py b/lib/plots/parts_per_set.py new file mode 100644 index 0000000..5400b5e --- /dev/null +++ b/lib/plots/parts_per_set.py @@ -0,0 +1,110 @@ +"""Graphiques sur la taille moyenne des sets (pièces par set).""" + +from pathlib import Path +from typing import Dict, Iterable, List, Tuple + +import matplotlib.pyplot as plt + +from lib.filesystem import ensure_parent_dir +from lib.milestones import load_milestones +from lib.rebrickable.stats import read_rows + + +def compute_average_parts_per_set(rows: Iterable[dict]) -> List[Tuple[int, float]]: + """Calcule la moyenne annuelle de pièces par set.""" + per_year: Dict[int, Dict[str, int]] = {} + for row in rows: + year = int(row["year"]) + per_year[year] = per_year.get(year, {"parts": 0, "sets": 0}) + per_year[year]["parts"] += int(row["num_parts"]) + per_year[year]["sets"] += 1 + results: List[Tuple[int, float]] = [] + for year in sorted(per_year): + totals = per_year[year] + results.append((year, totals["parts"] / totals["sets"])) + return results + + +def compute_rolling_mean(series: List[Tuple[int, float]], window: int) -> List[Tuple[int, float]]: + """Calcule la moyenne glissante sur une fenêtre donnée.""" + values = [value for _, value in series] + years = [year for year, _ in series] + rolling: List[Tuple[int, float]] = [] + for index in range(len(values)): + if index + 1 < window: + rolling.append((years[index], 0.0)) + else: + window_values = values[index - window + 1 : index + 1] + rolling.append((years[index], sum(window_values) / window)) + return rolling + + +def plot_parts_per_set( + enriched_sets_path: Path, + milestones_path: Path, + destination_path: Path, + rolling_window: int = 3, +) -> None: + """Génère un graphique de la moyenne annuelle et glissante des pièces par set.""" + sets_rows = read_rows(enriched_sets_path) + milestones = load_milestones(milestones_path) + annual_series = compute_average_parts_per_set(sets_rows) + rolling_series = compute_rolling_mean(annual_series, rolling_window) + years = [year for year, _ in annual_series] + annual_values = [value for _, value in annual_series] + rolling_values = [value for _, value in rolling_series] + + fig, ax = plt.subplots(figsize=(12, 6)) + ax.plot(years, annual_values, marker="o", color="#2ca02c", label="Moyenne annuelle (pièces/set)") + ax.plot( + years, + rolling_values, + marker="^", + color="#9467bd", + label=f"Moyenne glissante {rolling_window} ans (pièces/set)", + ) + ax.set_xlabel("Année") + ax.set_ylabel("Pièces par set") + ax.set_title("Évolution de la taille moyenne des sets (thèmes filtrés)") + ax.grid(True, linestyle="--", alpha=0.3) + ax.set_xlim(min(years) - 0.4, max(years) + 0.4) + ax.set_xticks(list(range(min(years), max(years) + 1))) + ax.tick_params(axis="x", labelrotation=45) + + peak = max(max(annual_values), max(rolling_values)) + top_limit = peak * 2 + milestones_in_range = sorted( + [m for m in milestones if min(years) <= m["year"] <= max(years)], + key=lambda m: (m["year"], m["description"]), + ) + milestone_offsets: Dict[int, int] = {} + offset_step = 0.4 + max_offset = 0 + for milestone in milestones_in_range: + year = milestone["year"] + count_for_year = milestone_offsets.get(year, 0) + milestone_offsets[year] = count_for_year + 1 + horizontal_offset = offset_step * (count_for_year // 2 + 1) + max_offset = max(max_offset, count_for_year) + if count_for_year % 2 == 1: + horizontal_offset *= -1 + text_x = year + horizontal_offset + ax.axvline(year, color="#d62728", linestyle="--", linewidth=1, alpha=0.65) + ax.text( + text_x, + top_limit, + milestone["description"], + rotation=90, + verticalalignment="top", + horizontalalignment="center", + fontsize=8, + color="#d62728", + ) + + ax.set_ylim(0, top_limit * (1 + max_offset * 0.02)) + ax.legend(loc="upper left", bbox_to_anchor=(1.12, 1)) + + ensure_parent_dir(destination_path) + fig.tight_layout() + fig.savefig(destination_path, dpi=150) + plt.close(fig) diff --git a/lib/plots/sets_per_year.py b/lib/plots/sets_per_year.py new file mode 100644 index 0000000..9f739a2 --- /dev/null +++ b/lib/plots/sets_per_year.py @@ -0,0 +1,196 @@ +"""Graphiques montrant le nombre de sets sortis par année.""" + +from pathlib import Path +from typing import Dict, Iterable, List, Tuple + +import matplotlib.pyplot as plt + +from lib.filesystem import ensure_parent_dir +from lib.milestones import load_milestones +from lib.rebrickable.stats import read_rows + + +def compute_sets_per_year(rows: Iterable[dict]) -> List[Tuple[int, int]]: + """Retourne la liste (année, nombre de sets) triée chronologiquement.""" + counts: Dict[int, int] = {} + for row in rows: + year = int(row["year"]) + counts[year] = counts.get(year, 0) + 1 + return sorted(counts.items(), key=lambda item: item[0]) + + +def compute_parts_per_year(rows: Iterable[dict]) -> List[Tuple[int, int]]: + """Retourne la liste (année, total de pièces) triée chronologiquement.""" + totals: Dict[int, int] = {} + for row in rows: + year = int(row["year"]) + totals[year] = totals.get(year, 0) + int(row["num_parts"]) + return sorted(totals.items(), key=lambda item: item[0]) + + +def plot_sets_per_year( + enriched_sets_path: Path, + milestones_path: Path, + destination_path: Path, +) -> None: + """Génère un histogramme annuel avec la moyenne cumulative et les jalons.""" + sets_rows = read_rows(enriched_sets_path) + milestones = load_milestones(milestones_path) + raw_series = compute_sets_per_year(sets_rows) + raw_parts_series = compute_parts_per_year(sets_rows) + min_year = min(year for year, _ in raw_series) + max_year = max(year for year, _ in raw_series) + series = [(year, dict(raw_series).get(year, 0)) for year in range(min_year, max_year + 1)] + parts_series = [(year, dict(raw_parts_series).get(year, 0)) for year in range(min_year, max_year + 1)] + years = [year for year, _ in series] + counts = [count for _, count in series] + parts_totals = [total for _, total in parts_series] + owned_counts_map: Dict[int, int] = {} + owned_parts_map: Dict[int, int] = {} + for row in sets_rows: + year = int(row["year"]) + if row["in_collection"] == "true": + owned_counts_map[year] = owned_counts_map.get(year, 0) + 1 + owned_parts_map[year] = owned_parts_map.get(year, 0) + int(row["num_parts"]) + owned_counts = [owned_counts_map.get(year, 0) for year in years] + missing_counts = [total - owned for total, owned in zip(counts, owned_counts)] + owned_parts = [owned_parts_map.get(year, 0) for year in years] + missing_parts = [total - owned for total, owned in zip(parts_totals, owned_parts)] + first_non_zero_index = next(index for index, value in enumerate(counts) if value > 0) + cumulative_mean = [] + total = 0 + for index, count in enumerate(counts): + total += count + cumulative_mean.append(total / (index + 1)) + cumulative_parts_mean = [] + rolling_sets = 0 + rolling_parts = 0 + for index, (count, parts) in enumerate(zip(counts, parts_totals)): + rolling_sets += count + rolling_parts += parts + if index < first_non_zero_index: + cumulative_parts_mean.append(0) + else: + cumulative_parts_mean.append(rolling_parts / rolling_sets) + + milestones_in_range = sorted( + [m for m in milestones if min_year <= m["year"] <= max_year], + key=lambda m: (m["year"], m["description"]), + ) + + fig, ax = plt.subplots(figsize=(14, 6)) + bar_width = 0.35 + x_sets = [year - bar_width / 2 for year in years] + bars_owned_sets = ax.bar( + x_sets, + owned_counts, + width=bar_width, + color="#1f77b4", + alpha=0.9, + label="Sets possédés", + zorder=2, + ) + bars_missing_sets = ax.bar( + x_sets, + missing_counts, + width=bar_width, + bottom=owned_counts, + color="#9ecae1", + alpha=0.8, + label="Sets non possédés", + ) + set_mean_line = ax.plot( + years, + cumulative_mean, + color="#ff7f0e", + marker="o", + label="Moyenne cumulative (sets)", + zorder=5, + ) + ax2 = ax.twinx() + x_parts = [year + bar_width / 2 for year in years] + parts_bars_owned = ax2.bar( + x_parts, + owned_parts, + width=bar_width, + color="#2ca02c", + alpha=0.9, + label="Pièces (sets possédés)", + zorder=2, + ) + parts_bars_missing = ax2.bar( + x_parts, + missing_parts, + width=bar_width, + bottom=owned_parts, + color="#c7e9c0", + alpha=0.85, + label="Pièces (sets non possédés)", + ) + parts_mean_line = ax2.plot( + years, + cumulative_parts_mean, + color="#9467bd", + marker="^", + label="Moyenne cumulative (pièces/set)", + zorder=6, + ) + parts_peak = max(parts_totals + [1]) + ax2.set_ylim(0, parts_peak * 1.1) + ax.set_xlabel("Année") + ax.set_ylabel("Nombre de sets") + ax2.set_ylabel("Nombre de pièces") + ax.set_title("Nombre de sets par année (thèmes filtrés)") + ax.grid(True, linestyle="--", alpha=0.3) + ax.set_xlim(min_year - 1, max_year + 0.4) + ax.set_xticks(list(range(min_year, max_year + 1))) + ax.tick_params(axis="x", labelrotation=45) + + peak = max(max(counts), max(cumulative_mean)) + top_limit = peak * 2 + milestone_offsets: Dict[int, int] = {} + offset_step = 0.3 + max_offset = 0 + for milestone in milestones_in_range: + year = milestone["year"] + count_for_year = milestone_offsets.get(year, 0) + milestone_offsets[year] = count_for_year + 1 + max_offset = max(max_offset, count_for_year) + horizontal_offset = offset_step * (count_for_year // 2 + 1) + if count_for_year % 2 == 1: + horizontal_offset *= -1 + text_x = year + horizontal_offset + ax.axvline(year, color="#d62728", linestyle="--", linewidth=1, alpha=0.65) + ax.text( + text_x, + top_limit, + milestone["description"], + rotation=90, + verticalalignment="top", + horizontalalignment="center", + fontsize=8, + color="#d62728", + ) + + ax.set_ylim(0, top_limit * (1 + max_offset * 0.02)) + handles = [ + bars_owned_sets, + bars_missing_sets, + parts_bars_owned, + parts_bars_missing, + set_mean_line[0], + parts_mean_line[0], + ] + labels = [ + "Sets possédés", + "Sets non possédés", + "Pièces (sets possédés)", + "Pièces (sets non possédés)", + "Moyenne cumulative (sets)", + "Moyenne cumulative (pièces/set)", + ] + ax.legend(handles, labels, loc="upper left", bbox_to_anchor=(1.12, 1)) + ensure_parent_dir(destination_path) + fig.tight_layout() + fig.savefig(destination_path, dpi=150) + plt.close(fig) diff --git a/lib/rebrickable/__init__.py b/lib/rebrickable/__init__.py new file mode 100644 index 0000000..0ce53d6 --- /dev/null +++ b/lib/rebrickable/__init__.py @@ -0,0 +1 @@ +"""Fonctionnalités liées aux données Rebrickable.""" diff --git a/lib/rebrickable/downloader.py b/lib/rebrickable/downloader.py new file mode 100644 index 0000000..2cdbc86 --- /dev/null +++ b/lib/rebrickable/downloader.py @@ -0,0 +1,47 @@ +"""Outils de téléchargement pour les fichiers fournis par Rebrickable.""" + +from datetime import datetime, timedelta +from pathlib import Path +from typing import Iterable, List +import gzip +import shutil + +import requests + + +REBRICKABLE_BASE_URL = "https://cdn.rebrickable.com/media/downloads/" +CHUNK_SIZE = 8192 +CACHE_TTL = 7 + + +def build_rebrickable_url(file_name: str) -> str: + """Construit l'URL complète d'un fichier Rebrickable à partir de son nom.""" + return f"{REBRICKABLE_BASE_URL}{file_name}" + + +def download_rebrickable_file(file_name: str, destination_dir: Path) -> Path: + """Télécharge un fichier Rebrickable, le décompresse et supprime l'archive.""" + target_path = destination_dir / file_name + destination_dir.mkdir(parents=True, exist_ok=True) + decompressed_path = target_path.with_suffix("") + if decompressed_path.exists(): + cache_age = datetime.now() - datetime.fromtimestamp(decompressed_path.stat().st_mtime) + if cache_age <= timedelta(days=CACHE_TTL): + if target_path.exists(): + target_path.unlink() + return decompressed_path + response = requests.get(build_rebrickable_url(file_name), stream=True) + response.raise_for_status() + with target_path.open("wb") as target_file: + for chunk in response.iter_content(chunk_size=CHUNK_SIZE): + target_file.write(chunk) + with gzip.open(target_path, "rb") as compressed_file: + with decompressed_path.open("wb") as decompressed_file: + shutil.copyfileobj(compressed_file, decompressed_file) + target_path.unlink() + return decompressed_path + + +def download_rebrickable_files(file_names: Iterable[str], destination_dir: Path) -> List[Path]: + """Télécharge en série plusieurs fichiers compressés fournis par Rebrickable.""" + return [download_rebrickable_file(file_name, destination_dir) for file_name in file_names] diff --git a/lib/rebrickable/enrich_sets.py b/lib/rebrickable/enrich_sets.py new file mode 100644 index 0000000..a1aacbc --- /dev/null +++ b/lib/rebrickable/enrich_sets.py @@ -0,0 +1,86 @@ +"""Enrichissement des sets LEGO avec des métadonnées Rebrickable et personnelles.""" + +import csv +from pathlib import Path +from typing import Iterable, Set + +from lib.filesystem import ensure_parent_dir + +REBRICKABLE_SET_BASE_URL = "https://rebrickable.com/sets/" + + +def extract_set_id(set_num: str) -> str: + """Extrait l'identifiant LEGO (partie avant la révision) depuis set_num.""" + return set_num.split("-", 1)[0] + + +def build_rebrickable_set_url(set_num: str) -> str: + """Construit l'URL publique Rebrickable d'un set.""" + return f"{REBRICKABLE_SET_BASE_URL}{set_num}" + + +def parse_set_collection_root(raw_value: str) -> Path | None: + """Prépare le chemin de collection, ou None si aucune collection n'est fournie.""" + cleaned = raw_value.strip() + if not cleaned: + print("La variable MY_SETS est vide, aucun set en collection.") + return None + return Path(cleaned) + + +def load_owned_set_ids(collection_root: Path) -> Set[str]: + """Retourne l'ensemble des identifiants de sets présents dans un dossier de collection.""" + if not collection_root.exists(): + print(f"Le dossier {collection_root} n'existe pas, aucun set en collection.") + return set() + if not collection_root.is_dir(): + print(f"Le chemin {collection_root} n'est pas un dossier, aucun set en collection.") + return set() + entries = [path for path in collection_root.iterdir() if path.is_dir()] + if not entries: + print(f"Le dossier {collection_root} est vide, aucun set en collection.") + return set() + return {entry.name for entry in entries} + + +def enrich_sets( + source_path: Path, + destination_path: Path, + owned_set_ids: Iterable[str], +) -> None: + """Ajoute les colonnes set_id, rebrickable_url et in_collection au catalogue filtré.""" + ensure_parent_dir(destination_path) + owned_lookup = set(owned_set_ids) + with source_path.open() as source_file, destination_path.open("w", newline="") as target_file: + reader = csv.DictReader(source_file) + fieldnames = reader.fieldnames + ["set_id", "rebrickable_url", "in_collection"] + writer = csv.DictWriter(target_file, fieldnames=fieldnames) + writer.writeheader() + for row in reader: + set_id = extract_set_id(row["set_num"]) + writer.writerow( + { + **row, + "set_id": set_id, + "rebrickable_url": build_rebrickable_set_url(row["set_num"]), + "in_collection": str(set_id in owned_lookup).lower(), + } + ) + + +def write_missing_sets_markdown(enriched_path: Path, destination_path: Path) -> None: + """Génère un tableau Markdown listant les sets non possédés.""" + with enriched_path.open() as source_file: + reader = csv.DictReader(source_file) + rows = [ + row + for row in reader + if row["in_collection"] == "false" + ] + ensure_parent_dir(destination_path) + with destination_path.open("w") as target_file: + target_file.write("| set_id | year | name |\n") + target_file.write("| --- | --- | --- |\n") + for row in rows: + link = f"[{row['set_id']}]({row['rebrickable_url']})" + target_file.write(f"| {link} | {row['year']} | {row['name']} |\n") diff --git a/lib/rebrickable/filter_sets.py b/lib/rebrickable/filter_sets.py new file mode 100644 index 0000000..b2f2452 --- /dev/null +++ b/lib/rebrickable/filter_sets.py @@ -0,0 +1,41 @@ +"""Filtrage des sets LEGO par identifiants de thèmes Rebrickable.""" + +import csv +from pathlib import Path +from typing import Dict, Iterable, List + +from lib.filesystem import ensure_parent_dir + +def parse_theme_ids(raw_value: str) -> List[str]: + """Extrait les identifiants de thèmes depuis une chaîne séparée par des virgules.""" + values = [value.strip() for value in raw_value.split(",") if value.strip()] + if not values: + raise ValueError("Au moins un identifiant de thème est requis.") + return values + + +def filter_sets_by_theme( + source_path: Path, + destination_path: Path, + theme_ids: Iterable[str], + overrides_path: Path, +) -> None: + """Filtre le catalogue des sets en conservant uniquement les thèmes ciblés avec pièces.""" + ensure_parent_dir(destination_path) + allowed_ids = set(theme_ids) + overrides = load_num_parts_overrides(overrides_path) + with source_path.open() as source_file, destination_path.open("w", newline="") as target_file: + reader = csv.DictReader(source_file) + writer = csv.DictWriter(target_file, fieldnames=reader.fieldnames) + writer.writeheader() + for row in reader: + if row["theme_id"] in allowed_ids and int(row["num_parts"]) > 0: + override = overrides.get(row["set_num"]) + writer.writerow({**row, "num_parts": override if override is not None else row["num_parts"]}) + + +def load_num_parts_overrides(overrides_path: Path) -> Dict[str, str]: + """Charge les corrections de nombre de pièces par set.""" + with overrides_path.open() as overrides_file: + reader = csv.DictReader(overrides_file) + return {row["set_num"]: row["num_parts"] for row in reader} diff --git a/lib/rebrickable/inventory_reconciliation.py b/lib/rebrickable/inventory_reconciliation.py new file mode 100644 index 0000000..fa6f070 --- /dev/null +++ b/lib/rebrickable/inventory_reconciliation.py @@ -0,0 +1,107 @@ +"""Rapport des écarts entre catalogue et inventaire agrégé.""" + +import csv +from pathlib import Path +from typing import Dict, Iterable, List + +from lib.filesystem import ensure_parent_dir + +def load_sets(sets_path: Path) -> List[dict]: + """Charge les sets filtrés pour l'analyse.""" + with sets_path.open() as sets_file: + reader = csv.DictReader(sets_file) + return list(reader) + + +def index_sets_by_num(sets: Iterable[dict]) -> Dict[str, dict]: + """Crée un index des sets par numéro complet.""" + return {row["set_num"]: row for row in sets} + + +def compute_inventory_totals(parts_path: Path, include_spares: bool) -> Dict[str, int]: + """Calcule le total de pièces par set, avec ou sans rechanges.""" + totals: Dict[str, int] = {} + with parts_path.open() as parts_file: + reader = csv.DictReader(parts_file) + for row in reader: + if not include_spares and row["is_spare"] == "true": + continue + set_num = row["set_num"] + totals[set_num] = totals.get(set_num, 0) + int(row["quantity_in_set"]) + return totals + + +def compute_inventory_gaps(sets_path: Path, parts_path: Path) -> List[dict]: + """Liste les sets dont le total de pièces diffère du catalogue.""" + sets = load_sets(sets_path) + totals_with_spares = compute_inventory_totals(parts_path, include_spares=True) + totals_without_spares = compute_inventory_totals(parts_path, include_spares=False) + gaps: List[dict] = [] + for set_row in sets: + expected_parts = int(set_row["num_parts"]) + inventory_parts_with_spares = totals_with_spares[set_row["set_num"]] + inventory_parts_non_spare = totals_without_spares[set_row["set_num"]] + if expected_parts != inventory_parts_with_spares: + gaps.append( + { + "set_num": set_row["set_num"], + "set_id": set_row["set_id"], + "expected_parts": expected_parts, + "inventory_parts": inventory_parts_with_spares, + "inventory_parts_non_spare": inventory_parts_non_spare, + "delta": abs(expected_parts - inventory_parts_with_spares), + "delta_non_spare": abs(expected_parts - inventory_parts_non_spare), + "in_collection": set_row["in_collection"], + } + ) + return gaps + + +def write_inventory_gaps_csv(destination_path: Path, gaps: Iterable[dict]) -> None: + """Écrit un CSV listant les sets en écart d'inventaire.""" + ensure_parent_dir(destination_path) + with destination_path.open("w", newline="") as csv_file: + fieldnames = [ + "set_num", + "set_id", + "expected_parts", + "inventory_parts", + "inventory_parts_non_spare", + "delta", + "delta_non_spare", + "in_collection", + ] + writer = csv.DictWriter(csv_file, fieldnames=fieldnames) + writer.writeheader() + for row in gaps: + writer.writerow(row) + + +def build_instructions_url(set_id: str) -> str: + """Construit un lien direct vers la page d'instructions LEGO du set.""" + return f"https://www.lego.com/service/buildinginstructions/{set_id}" + + +def write_inventory_gaps_markdown( + destination_path: Path, + gaps: Iterable[dict], + sets_by_num: Dict[str, dict], +) -> None: + """Génère un tableau Markdown listant les sets en écart d'inventaire.""" + ensure_parent_dir(destination_path) + with destination_path.open("w") as markdown_file: + markdown_file.write( + "| set_id | name | year | delta (spares inclus) | delta (spares exclus) | expected_parts | inventory_parts | inventory_parts_non_spare | in_collection | instructions |\n" + ) + markdown_file.write("| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |\n") + for row in gaps: + if row["delta_non_spare"] == 0: + continue + set_row = sets_by_num[row["set_num"]] + set_link = f"[{row['set_id']}]({set_row['rebrickable_url']})" + instructions_link = f"[PDF]({build_instructions_url(row['set_id'])})" + markdown_file.write( + f"| {set_link} | {set_row['name']} | {set_row['year']} | {row['delta']} | {row['delta_non_spare']} | " + f"{row['expected_parts']} | {row['inventory_parts']} | {row['inventory_parts_non_spare']} | " + f"{row['in_collection']} | {instructions_link} |\n" + ) diff --git a/lib/rebrickable/parts_inventory.py b/lib/rebrickable/parts_inventory.py new file mode 100644 index 0000000..823a5b1 --- /dev/null +++ b/lib/rebrickable/parts_inventory.py @@ -0,0 +1,143 @@ +"""Construction d'un inventaire détaillé des pièces par set.""" + +import csv +from pathlib import Path +from typing import Dict, List + +from lib.filesystem import ensure_parent_dir + + +def normalize_boolean(raw_value: str) -> str: + """Normalise une valeur booléenne en chaîne lowercase.""" + return raw_value.lower() + + +def select_latest_inventories(inventories_path: Path) -> Dict[str, dict]: + """Retient pour chaque set l'inventaire avec la version la plus élevée.""" + latest_inventories: Dict[str, dict] = {} + with inventories_path.open() as inventories_file: + reader = csv.DictReader(inventories_file) + for row in reader: + current = latest_inventories.get(row["set_num"]) + if current is None or int(row["version"]) > int(current["version"]): + latest_inventories[row["set_num"]] = {"id": row["id"], "version": row["version"]} + return latest_inventories + + +def build_color_lookup(colors_path: Path) -> Dict[str, dict]: + """Construit un index des couleurs par identifiant.""" + colors: Dict[str, dict] = {} + with colors_path.open() as colors_file: + reader = csv.DictReader(colors_file) + for row in reader: + colors[row["id"]] = { + "rgb": row["rgb"], + "is_translucent": normalize_boolean(row["is_trans"]), + } + return colors + + +def index_inventory_parts_by_inventory(inventory_parts_path: Path) -> Dict[str, List[dict]]: + """Indexe les lignes d'inventaire par identifiant d'inventaire.""" + parts_by_inventory: Dict[str, List[dict]] = {} + with inventory_parts_path.open() as parts_file: + reader = csv.DictReader(parts_file) + for row in reader: + inventory_id = row["inventory_id"] + if inventory_id not in parts_by_inventory: + parts_by_inventory[inventory_id] = [] + parts_by_inventory[inventory_id].append(row) + return parts_by_inventory + + +def index_inventory_minifigs_by_inventory(inventory_minifigs_path: Path) -> Dict[str, List[dict]]: + """Indexe les minifigs par inventaire.""" + minifigs_by_inventory: Dict[str, List[dict]] = {} + with inventory_minifigs_path.open() as minifigs_file: + reader = csv.DictReader(minifigs_file) + for row in reader: + inventory_id = row["inventory_id"] + if inventory_id not in minifigs_by_inventory: + minifigs_by_inventory[inventory_id] = [] + minifigs_by_inventory[inventory_id].append(row) + return minifigs_by_inventory + + +def build_minifig_lookup(minifigs_path: Path) -> Dict[str, dict]: + """Construit un index des minifigs avec leur nombre de pièces.""" + minifigs: Dict[str, dict] = {} + with minifigs_path.open() as minifigs_file: + reader = csv.DictReader(minifigs_file) + for row in reader: + minifigs[row["fig_num"]] = row + return minifigs + + +def write_parts_filtered( + sets_path: Path, + inventories_path: Path, + inventory_parts_path: Path, + colors_path: Path, + inventory_minifigs_path: Path, + minifigs_path: Path, + destination_path: Path, +) -> None: + """Assemble un CSV agrégé listant les pièces par set et par couleur.""" + latest_inventories = select_latest_inventories(inventories_path) + parts_by_inventory = index_inventory_parts_by_inventory(inventory_parts_path) + minifigs_by_inventory = index_inventory_minifigs_by_inventory(inventory_minifigs_path) + minifigs = build_minifig_lookup(minifigs_path) + colors = build_color_lookup(colors_path) + ensure_parent_dir(destination_path) + with sets_path.open() as sets_file, destination_path.open("w", newline="") as target_file: + sets_reader = csv.DictReader(sets_file) + fieldnames = [ + "part_num", + "color_rgb", + "is_translucent", + "set_num", + "set_id", + "quantity_in_set", + "is_spare", + ] + writer = csv.DictWriter(target_file, fieldnames=fieldnames) + writer.writeheader() + for set_row in sets_reader: + inventory = latest_inventories[set_row["set_num"]] + inventory_parts = parts_by_inventory[inventory["id"]] + inventory_total_non_spare = sum( + int(part_row["quantity"]) + for part_row in inventory_parts + if normalize_boolean(part_row["is_spare"]) == "false" + ) + expected_parts = int(set_row["num_parts"]) + for part_row in inventory_parts: + color = colors[part_row["color_id"]] + writer.writerow( + { + "part_num": part_row["part_num"], + "color_rgb": color["rgb"], + "is_translucent": color["is_translucent"], + "set_num": set_row["set_num"], + "set_id": set_row["set_id"], + "quantity_in_set": part_row["quantity"], + "is_spare": normalize_boolean(part_row["is_spare"]), + } + ) + if inventory_total_non_spare < expected_parts: + for minifig_row in minifigs_by_inventory.get(inventory["id"], []): + minifig_inventory = latest_inventories[minifig_row["fig_num"]] + minifig_parts = parts_by_inventory[minifig_inventory["id"]] + for part_row in minifig_parts: + color = colors[part_row["color_id"]] + writer.writerow( + { + "part_num": part_row["part_num"], + "color_rgb": color["rgb"], + "is_translucent": color["is_translucent"], + "set_num": set_row["set_num"], + "set_id": set_row["set_id"], + "quantity_in_set": str(int(part_row["quantity"]) * int(minifig_row["quantity"])), + "is_spare": normalize_boolean(part_row["is_spare"]), + } + ) diff --git a/lib/rebrickable/parts_stats.py b/lib/rebrickable/parts_stats.py new file mode 100644 index 0000000..960589b --- /dev/null +++ b/lib/rebrickable/parts_stats.py @@ -0,0 +1,101 @@ +"""Calculs de statistiques simples sur les pièces filtrées.""" + +import csv +from collections import defaultdict +from pathlib import Path +from typing import Dict, Iterable, List, Sequence, Tuple + +from lib.filesystem import ensure_parent_dir +from lib.rebrickable.inventory_reconciliation import compute_inventory_gaps +from lib.rebrickable.stats import read_rows as read_stats_rows + + +def read_rows(path: Path) -> List[dict]: + """Charge un fichier CSV en mémoire sous forme de dictionnaires.""" + with path.open() as csv_file: + reader = csv.DictReader(csv_file) + return list(reader) + + +def select_non_spare_parts(rows: Iterable[dict]) -> List[dict]: + """Filtre les pièces en excluant les rechanges.""" + return [row for row in rows if row["is_spare"] == "false"] + + +def variation_key(row: dict) -> Tuple[str, str, str]: + """Clé d'unicité pour une variation de pièce (référence + couleur).""" + return (row["part_num"], row["color_rgb"], row["is_translucent"]) + + +def color_key(row: dict) -> Tuple[str, str]: + """Clé d'unicité pour une couleur.""" + return (row["color_rgb"], row["is_translucent"]) + + +def aggregate_quantities_by_variation(rows: Iterable[dict]) -> Dict[Tuple[str, str, str], int]: + """Calcule la quantité totale par variation de pièce (hors rechanges).""" + quantities: Dict[Tuple[str, str, str], int] = defaultdict(int) + for row in rows: + quantities[variation_key(row)] += int(row["quantity_in_set"]) + return quantities + + +def read_total_filtered_parts(stats_path: Path) -> int: + """Lit le total de pièces attendu pour les thèmes filtrés depuis stats.csv.""" + rows = read_stats_rows(stats_path) + return int( + next(row["valeur"] for row in rows if row["libelle"] == "Total de pièces pour les thèmes filtrés") + ) + + +def build_stats( + rows: Iterable[dict], + sets_path: Path, + parts_path: Path, + stats_path: Path, +) -> List[Tuple[str, str]]: + """Construit les statistiques principales sur les pièces filtrées et les écarts d'inventaire.""" + non_spares = select_non_spare_parts(rows) + quantities = aggregate_quantities_by_variation(non_spares) + total_variations = len(quantities) + color_set = {color_key(row) for row in non_spares} + least_used_key = min(quantities, key=quantities.get) + most_used_key = max(quantities, key=quantities.get) + least_used = quantities[least_used_key] + most_used = quantities[most_used_key] + total_non_spare = sum(quantities.values()) + gaps = compute_inventory_gaps(sets_path, parts_path) + gap_count = len(gaps) + worst_gap = max(gaps, key=lambda gap: gap["delta"]) if gap_count > 0 else {"set_id": "none", "delta": 0} + catalog_total_parts = read_total_filtered_parts(stats_path) + catalog_inventory_delta = catalog_total_parts - total_non_spare + + return [ + ("Total de variations de pièces (hors rechanges)", str(total_variations)), + ( + "Pièce la moins utilisée (référence + couleur)", + f"{least_used_key[0]} / {least_used_key[1]} / {least_used_key[2]} ({least_used})", + ), + ( + "Pièce la plus commune (référence + couleur)", + f"{most_used_key[0]} / {most_used_key[1]} / {most_used_key[2]} ({most_used})", + ), + ("Total de couleurs utilisées (hors rechanges)", str(len(color_set))), + ("Total de pièces hors rechanges", str(total_non_spare)), + ( + "Ecart total catalogue (stats) - inventaire (hors rechanges)", + str(catalog_inventory_delta), + ), + ("Nombre de sets en écart inventaire/catalogue", str(gap_count)), + ("Ecart maximal inventaire/catalogue", f"{worst_gap['set_id']} ({worst_gap['delta']})"), + ] + + +def write_parts_stats(destination_path: Path, stats: Sequence[Tuple[str, str]]) -> None: + """Écrit les statistiques dans un CSV à deux colonnes.""" + ensure_parent_dir(destination_path) + with destination_path.open("w", newline="") as csv_file: + writer = csv.writer(csv_file) + writer.writerow(["libelle", "valeur"]) + for label, value in stats: + writer.writerow([label, value]) diff --git a/lib/rebrickable/stats.py b/lib/rebrickable/stats.py new file mode 100644 index 0000000..d259060 --- /dev/null +++ b/lib/rebrickable/stats.py @@ -0,0 +1,122 @@ +"""Calcul des statistiques de base sur les sets LEGO filtrés.""" + +import csv +from pathlib import Path +from typing import Iterable, List, Sequence, Tuple + +from lib.filesystem import ensure_parent_dir + +def read_rows(path: Path) -> List[dict]: + """Charge un fichier CSV en mémoire sous forme de dictionnaires.""" + with path.open() as csv_file: + reader = csv.DictReader(csv_file) + return list(reader) + + +def write_stats_csv(destination_path: Path, stats: Sequence[Tuple[str, str]]) -> None: + """Écrit les statistiques dans un CSV à deux colonnes.""" + ensure_parent_dir(destination_path) + with destination_path.open("w", newline="") as csv_file: + writer = csv.writer(csv_file) + writer.writerow(["libelle", "valeur"]) + for label, value in stats: + writer.writerow([label, value]) + + +def compute_median(values: List[int]) -> float: + """Calcule la médiane d'une liste de valeurs entières.""" + sorted_values = sorted(values) + middle = len(sorted_values) // 2 + if len(sorted_values) % 2 == 1: + return float(sorted_values[middle]) + return (sorted_values[middle - 1] + sorted_values[middle]) / 2 + + +def compute_basic_stats( + themes: Iterable[dict], + all_sets: Iterable[dict], + filtered_sets: Iterable[dict], + enriched_sets: Iterable[dict], +) -> List[Tuple[str, str]]: + """Calcule les statistiques principales à partir des sets chargés.""" + themes_list = list(themes) + all_sets_list = list(all_sets) + filtered_sets_list = list(filtered_sets) + enriched_sets_list = list(enriched_sets) + + theme_count_total = len(themes_list) + total_sets = len(all_sets_list) + filtered_sets_count = len(filtered_sets_list) + avg_sets_per_theme = total_sets / theme_count_total + percent_filtered = (filtered_sets_count / total_sets) * 100 + owned_sets_count = sum(1 for row in enriched_sets_list if row["in_collection"] == "true") + missing_sets_count = sum(1 for row in enriched_sets_list if row["in_collection"] == "false") + percent_owned = (owned_sets_count / filtered_sets_count) * 100 + parts_per_set = [int(row["num_parts"]) for row in filtered_sets_list] + avg_parts_per_set = sum(parts_per_set) / filtered_sets_count + median_parts_per_set = compute_median(parts_per_set) + years = [int(row["year"]) for row in filtered_sets_list] + avg_sets_per_year = filtered_sets_count / len(set(years)) + total_parts = sum(parts_per_set) + theme_ids_filtered = {row["theme_id"] for row in filtered_sets_list} + min_year = str(min(years)) + max_year = str(max(years)) + year_counts = {} + for year in years: + year_counts[year] = year_counts.get(year, 0) + 1 + prolific_year, prolific_count = max(year_counts.items(), key=lambda item: (item[1], -item[0])) + richest_set = max(filtered_sets_list, key=lambda row: int(row["num_parts"])) + lightest_set = min(filtered_sets_list, key=lambda row: int(row["num_parts"])) + oldest_set = min(filtered_sets_list, key=lambda row: (int(row["year"]), row["set_num"])) + latest_set = max(filtered_sets_list, key=lambda row: (int(row["year"]), row["set_num"])) + owned_parts = [int(row["num_parts"]) for row in enriched_sets_list if row["in_collection"] == "true"] + missing_parts = [int(row["num_parts"]) for row in enriched_sets_list if row["in_collection"] == "false"] + avg_parts_owned = sum(owned_parts) / len(owned_parts) + avg_parts_missing = sum(missing_parts) / len(missing_parts) + total_parts_owned = sum(owned_parts) + percent_parts_owned = (total_parts_owned / total_parts) * 100 + + return [ + ("Nombre total de sets (catalogue complet)", str(total_sets)), + ("Nombre total de thèmes (catalogue complet)", str(theme_count_total)), + ("Nombre de sets après filtrage (thèmes ciblés)", str(filtered_sets_count)), + ("Nombre moyen de sets par thème (catalogue complet)", f"{avg_sets_per_theme:.2f}"), + ("Pourcentage des sets filtrés vs total", f"{percent_filtered:.2f}%"), + ("Taux de possession (thèmes filtrés)", f"{percent_owned:.2f}%"), + ("Sets dans la collection", str(owned_sets_count)), + ("Sets manquants pour la collection", str(missing_sets_count)), + ("Nombre moyen de pièces par set (thèmes filtrés)", f"{avg_parts_per_set:.2f}"), + ("Médiane de pièces par set (thèmes filtrés)", f"{median_parts_per_set:.2f}"), + ("Nombre moyen de sets commercialisés par an (thèmes filtrés)", f"{avg_sets_per_year:.2f}"), + ("Total de pièces pour les thèmes filtrés", str(total_parts)), + ("Total de pièces des sets possédés", str(total_parts_owned)), + ("Pourcentage de pièces possédées (thèmes filtrés)", f"{percent_parts_owned:.2f}%"), + ("Nombre de thèmes filtrés", str(len(theme_ids_filtered))), + ("Première année de sortie (thèmes filtrés)", min_year), + ("Dernière année de sortie (thèmes filtrés)", max_year), + ("Année la plus prolifique (thèmes filtrés)", f"{prolific_year} ({prolific_count} sets)"), + ( + "Set avec le plus de pièces (thèmes filtrés)", + f"{richest_set['set_num']} - {richest_set['name']} ({richest_set['num_parts']} pièces)", + ), + ( + "Set avec le moins de pièces (thèmes filtrés)", + f"{lightest_set['set_num']} - {lightest_set['name']} ({lightest_set['num_parts']} pièces)", + ), + ( + "Set le plus ancien (thèmes filtrés)", + f"{oldest_set['set_num']} - {oldest_set['name']} ({oldest_set['year']})", + ), + ( + "Set le plus récent (thèmes filtrés)", + f"{latest_set['set_num']} - {latest_set['name']} ({latest_set['year']})", + ), + ( + "Nombre moyen de pièces des sets possédés", + f"{avg_parts_owned:.2f}", + ), + ( + "Nombre moyen de pièces des sets manquants", + f"{avg_parts_missing:.2f}", + ), + ] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..171492d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +matplotlib +python-dotenv +pytest +requests +responses +colorspacious diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..eaf7c45 --- /dev/null +++ b/scripts/__init__.py @@ -0,0 +1 @@ +"""Scripts d'orchestration pour préparer les données LEGO.""" diff --git a/scripts/build_parts_inventory.py b/scripts/build_parts_inventory.py new file mode 100644 index 0000000..17c6db6 --- /dev/null +++ b/scripts/build_parts_inventory.py @@ -0,0 +1,31 @@ +"""Assemble un inventaire des pièces par set et par couleur.""" + +from pathlib import Path + +from lib.rebrickable.parts_inventory import write_parts_filtered + + +SETS_PATH = Path("data/intermediate/sets_enriched.csv") +INVENTORIES_PATH = Path("data/raw/inventories.csv") +INVENTORY_PARTS_PATH = Path("data/raw/inventory_parts.csv") +COLORS_PATH = Path("data/raw/colors.csv") +INVENTORY_MINIFIGS_PATH = Path("data/raw/inventory_minifigs.csv") +MINIFIGS_PATH = Path("data/raw/minifigs.csv") +DESTINATION_PATH = Path("data/intermediate/parts_filtered.csv") + + +def main() -> None: + """Génère le fichier parts_filtered.csv prêt pour les analyses suivantes.""" + write_parts_filtered( + SETS_PATH, + INVENTORIES_PATH, + INVENTORY_PARTS_PATH, + COLORS_PATH, + INVENTORY_MINIFIGS_PATH, + MINIFIGS_PATH, + DESTINATION_PATH, + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/compute_parts_stats.py b/scripts/compute_parts_stats.py new file mode 100644 index 0000000..98665f7 --- /dev/null +++ b/scripts/compute_parts_stats.py @@ -0,0 +1,22 @@ +"""Calcule des statistiques simples sur les pièces filtrées.""" + +from pathlib import Path + +from lib.rebrickable.parts_stats import read_rows, build_stats, write_parts_stats + + +PARTS_PATH = Path("data/intermediate/parts_filtered.csv") +SETS_PATH = Path("data/intermediate/sets_enriched.csv") +STATS_PATH = Path("data/final/stats.csv") +DESTINATION_PATH = Path("data/final/parts_stats.csv") + + +def main() -> None: + """Charge les pièces filtrées et écrit les statistiques associées.""" + rows = read_rows(PARTS_PATH) + stats = build_stats(rows, SETS_PATH, PARTS_PATH, STATS_PATH) + write_parts_stats(DESTINATION_PATH, stats) + + +if __name__ == "__main__": + main() diff --git a/scripts/compute_stats.py b/scripts/compute_stats.py new file mode 100644 index 0000000..70eabb3 --- /dev/null +++ b/scripts/compute_stats.py @@ -0,0 +1,26 @@ +"""Calcule et exporte les statistiques principales sur les sets LEGO filtrés.""" + +from pathlib import Path + +from lib.rebrickable.stats import compute_basic_stats, read_rows, write_stats_csv + + +THEMES_PATH = Path("data/raw/themes.csv") +ALL_SETS_PATH = Path("data/raw/sets.csv") +FILTERED_SETS_PATH = Path("data/intermediate/sets_filtered.csv") +ENRICHED_SETS_PATH = Path("data/intermediate/sets_enriched.csv") +DESTINATION_PATH = Path("data/final/stats.csv") + + +def main() -> None: + """Charge les données, calcule les statistiques et exporte le CSV.""" + themes = read_rows(THEMES_PATH) + all_sets = read_rows(ALL_SETS_PATH) + filtered_sets = read_rows(FILTERED_SETS_PATH) + enriched_sets = read_rows(ENRICHED_SETS_PATH) + stats = compute_basic_stats(themes, all_sets, filtered_sets, enriched_sets) + write_stats_csv(DESTINATION_PATH, stats) + + +if __name__ == "__main__": + main() diff --git a/scripts/download_parts_data.py b/scripts/download_parts_data.py new file mode 100644 index 0000000..0deb702 --- /dev/null +++ b/scripts/download_parts_data.py @@ -0,0 +1,25 @@ +"""Télécharge les fichiers nécessaires aux pièces LEGO depuis Rebrickable.""" + +from pathlib import Path + +from lib.rebrickable.downloader import download_rebrickable_files + + +FILES_TO_DOWNLOAD = [ + "inventories.csv.gz", + "inventory_parts.csv.gz", + "parts.csv.gz", + "colors.csv.gz", + "inventory_minifigs.csv.gz", + "minifigs.csv.gz" +] +DESTINATION_DIR = Path("data/raw") + + +def main() -> None: + """Lance le téléchargement des fichiers liés aux pièces LEGO.""" + download_rebrickable_files(FILES_TO_DOWNLOAD, DESTINATION_DIR) + + +if __name__ == "__main__": + main() diff --git a/scripts/download_sets.py b/scripts/download_sets.py new file mode 100644 index 0000000..453fef4 --- /dev/null +++ b/scripts/download_sets.py @@ -0,0 +1,18 @@ +"""Télécharge le catalogue des sets LEGO depuis Rebrickable.""" + +from pathlib import Path + +from lib.rebrickable.downloader import download_rebrickable_file + + +SETS_FILE_NAME = "sets.csv.gz" +DESTINATION_DIR = Path("data/raw") + + +def main() -> None: + """Lance le téléchargement du fichier des sets.""" + download_rebrickable_file(SETS_FILE_NAME, DESTINATION_DIR) + + +if __name__ == "__main__": + main() diff --git a/scripts/download_themes.py b/scripts/download_themes.py new file mode 100644 index 0000000..a3dc6d5 --- /dev/null +++ b/scripts/download_themes.py @@ -0,0 +1,18 @@ +"""Télécharge le catalogue des thèmes LEGO depuis Rebrickable.""" + +from pathlib import Path + +from lib.rebrickable.downloader import download_rebrickable_file + + +THEMES_FILE_NAME = "themes.csv.gz" +DESTINATION_DIR = Path("data/raw") + + +def main() -> None: + """Lance le téléchargement du fichier des thèmes.""" + download_rebrickable_file(THEMES_FILE_NAME, DESTINATION_DIR) + + +if __name__ == "__main__": + main() diff --git a/scripts/enrich_sets.py b/scripts/enrich_sets.py new file mode 100644 index 0000000..adcfa31 --- /dev/null +++ b/scripts/enrich_sets.py @@ -0,0 +1,31 @@ +"""Enrichit les sets filtrés avec des métadonnées et la présence en collection.""" + +import os +from pathlib import Path + +from dotenv import load_dotenv + +from lib.rebrickable.enrich_sets import ( + enrich_sets, + load_owned_set_ids, + parse_set_collection_root, + write_missing_sets_markdown, +) + + +SOURCE_PATH = Path("data/intermediate/sets_filtered.csv") +DESTINATION_PATH = Path("data/intermediate/sets_enriched.csv") +MISSING_MARKDOWN_PATH = Path("data/final/sets_missing.md") + + +def main() -> None: + """Lance l'enrichissement des sets filtrés.""" + load_dotenv() + owned_root = parse_set_collection_root(os.environ.get("MY_SETS", "")) + owned_set_ids = load_owned_set_ids(owned_root) if owned_root is not None else set() + enrich_sets(SOURCE_PATH, DESTINATION_PATH, owned_set_ids) + write_missing_sets_markdown(DESTINATION_PATH, MISSING_MARKDOWN_PATH) + + +if __name__ == "__main__": + main() diff --git a/scripts/filter_sets.py b/scripts/filter_sets.py new file mode 100644 index 0000000..3cdb3ae --- /dev/null +++ b/scripts/filter_sets.py @@ -0,0 +1,24 @@ +"""Filtre les sets LEGO pour ne conserver que les thèmes ciblés.""" + +import os +from pathlib import Path + +from dotenv import load_dotenv + +from lib.rebrickable.filter_sets import filter_sets_by_theme, parse_theme_ids + + +SOURCE_PATH = Path("data/raw/sets.csv") +DESTINATION_PATH = Path("data/intermediate/sets_filtered.csv") +OVERRIDES_PATH = Path("config/num_parts_overrides.csv") + + +def main() -> None: + """Lance le filtrage des sets à partir des identifiants définis dans l'environnement.""" + load_dotenv() + theme_ids = parse_theme_ids(os.environ["THEME_IDS"]) + filter_sets_by_theme(SOURCE_PATH, DESTINATION_PATH, theme_ids, OVERRIDES_PATH) + + +if __name__ == "__main__": + main() diff --git a/scripts/plot_colors_grid.py b/scripts/plot_colors_grid.py new file mode 100644 index 0000000..343f2bb --- /dev/null +++ b/scripts/plot_colors_grid.py @@ -0,0 +1,21 @@ +"""Génère une grille artistique des couleurs utilisées.""" + +from pathlib import Path + +from lib.plots.colors_grid import plot_colors_grid + + +PARTS_PATH = Path("data/intermediate/parts_filtered.csv") +COLORS_PATH = Path("data/raw/colors.csv") +DESTINATION_PATH = Path("figures/step12/colors_grid.png") +MINIFIG_DESTINATION_PATH = Path("figures/step12/colors_grid_minifigs.png") + + +def main() -> None: + """Construit les visuels des palettes de couleurs utilisées.""" + plot_colors_grid(PARTS_PATH, COLORS_PATH, DESTINATION_PATH, minifig_only=False) + plot_colors_grid(PARTS_PATH, COLORS_PATH, MINIFIG_DESTINATION_PATH, minifig_only=True) + + +if __name__ == "__main__": + main() diff --git a/scripts/plot_parts_per_set.py b/scripts/plot_parts_per_set.py new file mode 100644 index 0000000..2765d8f --- /dev/null +++ b/scripts/plot_parts_per_set.py @@ -0,0 +1,19 @@ +"""Trace la moyenne annuelle et glissante des pièces par set.""" + +from pathlib import Path + +from lib.plots.parts_per_set import plot_parts_per_set + + +ENRICHED_SETS_PATH = Path("data/intermediate/sets_enriched.csv") +MILESTONES_PATH = Path("config/milestones.csv") +DESTINATION_PATH = Path("figures/step07/avg_parts_per_set.png") + + +def main() -> None: + """Génère le graphique des tailles moyennes des sets.""" + plot_parts_per_set(ENRICHED_SETS_PATH, MILESTONES_PATH, DESTINATION_PATH) + + +if __name__ == "__main__": + main() diff --git a/scripts/plot_sets_per_year.py b/scripts/plot_sets_per_year.py new file mode 100644 index 0000000..15b2aff --- /dev/null +++ b/scripts/plot_sets_per_year.py @@ -0,0 +1,19 @@ +"""Trace l'évolution du nombre de sets par année (thèmes filtrés) avec jalons.""" + +from pathlib import Path + +from lib.plots.sets_per_year import plot_sets_per_year + + +ENRICHED_SETS_PATH = Path("data/intermediate/sets_enriched.csv") +MILESTONES_PATH = Path("config/milestones.csv") +DESTINATION_PATH = Path("figures/step07/sets_per_year.png") + + +def main() -> None: + """Génère le graphique des sets par année.""" + plot_sets_per_year(ENRICHED_SETS_PATH, MILESTONES_PATH, DESTINATION_PATH) + + +if __name__ == "__main__": + main() diff --git a/scripts/report_inventory_gaps.py b/scripts/report_inventory_gaps.py new file mode 100644 index 0000000..aa1c7bd --- /dev/null +++ b/scripts/report_inventory_gaps.py @@ -0,0 +1,30 @@ +"""Produit un rapport des écarts entre inventaires et catalogue.""" + +from pathlib import Path + +from lib.rebrickable.inventory_reconciliation import ( + compute_inventory_gaps, + index_sets_by_num, + load_sets, + write_inventory_gaps_csv, + write_inventory_gaps_markdown, +) + + +SETS_PATH = Path("data/intermediate/sets_enriched.csv") +PARTS_PATH = Path("data/intermediate/parts_filtered.csv") +DESTINATION_PATH = Path("data/final/inventory_gaps.csv") +MARKDOWN_PATH = Path("data/final/inventory_gaps.md") + + +def main() -> None: + """Génère le fichier d'écarts d'inventaire.""" + sets = load_sets(SETS_PATH) + sets_by_num = index_sets_by_num(sets) + gaps = compute_inventory_gaps(SETS_PATH, PARTS_PATH) + write_inventory_gaps_csv(DESTINATION_PATH, gaps) + write_inventory_gaps_markdown(MARKDOWN_PATH, gaps, sets_by_num) + + +if __name__ == "__main__": + main() diff --git a/tests/test_colors_grid_plot.py b/tests/test_colors_grid_plot.py new file mode 100644 index 0000000..f813968 --- /dev/null +++ b/tests/test_colors_grid_plot.py @@ -0,0 +1,63 @@ +"""Tests de la visualisation des couleurs utilisées.""" + +from pathlib import Path + +import matplotlib + +from lib.plots.colors_grid import build_hex_positions, load_used_colors, plot_colors_grid + + +matplotlib.use("Agg") + + +def write_csv(path: Path, headers: list[str], rows: list[list[str]]) -> None: + """Écrit un CSV simple pour les besoins de tests.""" + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", newline="") as csv_file: + import csv + + writer = csv.writer(csv_file) + writer.writerow(headers) + writer.writerows(rows) + + +def test_build_hex_positions() -> None: + """Construit suffisamment de positions pour toutes les couleurs.""" + positions = build_hex_positions(10, columns=4, spacing=1.0) + assert len(positions) == 10 + assert positions[0] == (0.0, 0.0) + assert positions[1][0] > positions[0][0] + + +def test_plot_colors_grid(tmp_path: Path) -> None: + """Produit un fichier image avec les couleurs utilisées.""" + parts_path = tmp_path / "parts_filtered.csv" + colors_path = tmp_path / "colors.csv" + destination_path = tmp_path / "colors_grid.png" + + write_csv( + parts_path, + ["part_num", "color_rgb", "is_translucent", "set_id", "quantity_in_set", "is_spare"], + [ + ["3001", "FFFFFF", "false", "1000", "2", "false"], + ["3002", "000000", "true", "1000", "5", "false"], + ["3003", "FF0000", "false", "1000", "1", "true"], + ], + ) + write_csv( + colors_path, + ["id", "name", "rgb", "is_trans", "num_parts", "num_sets", "y1", "y2"], + [ + ["1", "White", "FFFFFF", "False", "0", "0", "0", "0"], + ["2", "Black", "000000", "True", "0", "0", "0", "0"], + ["3", "Red", "FF0000", "False", "0", "0", "0", "0"], + ], + ) + + colors = load_used_colors(parts_path, colors_path) + assert len(colors) == 3 + + plot_colors_grid(parts_path, colors_path, destination_path) + + assert destination_path.exists() + assert destination_path.stat().st_size > 0 diff --git a/tests/test_downloader.py b/tests/test_downloader.py new file mode 100644 index 0000000..378a9f0 --- /dev/null +++ b/tests/test_downloader.py @@ -0,0 +1,91 @@ +"""Tests du module de téléchargement Rebrickable.""" + +import gzip +from pathlib import Path + +import responses + +from lib.rebrickable.downloader import ( + build_rebrickable_url, + download_rebrickable_file, + download_rebrickable_files, +) + + +def test_build_rebrickable_url() -> None: + """Construit l'URL complète vers Rebrickable.""" + assert build_rebrickable_url("themes.csv.gz") == ( + "https://cdn.rebrickable.com/media/downloads/themes.csv.gz" + ) + + +@responses.activate +def test_download_rebrickable_file(tmp_path: Path) -> None: + """Télécharge, enregistre et décompresse le fichier compressé.""" + file_name = "themes.csv.gz" + uncompressed_content = b"compressed-data" + compressed_body = gzip.compress(uncompressed_content) + responses.add( + responses.GET, + build_rebrickable_url(file_name), + body=compressed_body, + status=200, + ) + + target_path = download_rebrickable_file(file_name, tmp_path) + + assert target_path == tmp_path / "themes.csv" + assert target_path.read_bytes() == uncompressed_content + assert not (tmp_path / file_name).exists() + + +@responses.activate +def test_download_skips_when_cache_is_fresh(tmp_path: Path) -> None: + """Ne retélécharge pas un fichier récent et conserve le contenu.""" + file_name = "themes.csv.gz" + cached_path = tmp_path / "themes.csv" + cached_path.write_bytes(b"cached") + + target_path = download_rebrickable_file(file_name, tmp_path) + + assert target_path == cached_path + assert target_path.read_bytes() == b"cached" + assert not (tmp_path / file_name).exists() + assert len(responses.calls) == 0 + + +@responses.activate +def test_download_multiple_rebrickable_files(tmp_path: Path) -> None: + """Télécharge plusieurs fichiers compressés et les décompresse.""" + file_names = [ + "inventories.csv.gz", + "inventory_parts.csv.gz", + "parts.csv.gz", + "colors.csv.gz", + ] + compressed_bodies = {} + for file_name in file_names: + uncompressed_content = file_name.encode() + compressed_body = gzip.compress(uncompressed_content) + compressed_bodies[file_name] = compressed_body + responses.add( + responses.GET, + build_rebrickable_url(file_name), + body=compressed_body, + status=200, + ) + + downloaded_paths = download_rebrickable_files(file_names, tmp_path) + + assert downloaded_paths == [ + tmp_path / "inventories.csv", + tmp_path / "inventory_parts.csv", + tmp_path / "parts.csv", + tmp_path / "colors.csv", + ] + assert len(responses.calls) == len(file_names) + for file_name in file_names: + target_path = tmp_path / file_name + decompressed_path = target_path.with_suffix("") + assert decompressed_path.read_bytes() == file_name.encode() + assert not target_path.exists() diff --git a/tests/test_enrich_sets.py b/tests/test_enrich_sets.py new file mode 100644 index 0000000..2157589 --- /dev/null +++ b/tests/test_enrich_sets.py @@ -0,0 +1,77 @@ +"""Tests de l'enrichissement des sets filtrés.""" + +from pathlib import Path + +from lib.rebrickable.enrich_sets import ( + build_rebrickable_set_url, + enrich_sets, + extract_set_id, + load_owned_set_ids, + parse_set_collection_root, + write_missing_sets_markdown, +) + + +def test_extract_set_id_removes_revision() -> None: + """Supprime la révision de l'identifiant set_num.""" + assert extract_set_id("75936-1") == "75936" + + +def test_build_rebrickable_set_url() -> None: + """Construit l'URL publique Rebrickable à partir du set_num.""" + assert build_rebrickable_set_url("75936-1") == "https://rebrickable.com/sets/75936-1" + + +def test_parse_set_collection_root_empty_returns_none() -> None: + """Renvoie None pour une valeur vide.""" + assert parse_set_collection_root(" ") is None + + +def test_load_owned_set_ids_handles_missing_and_collects(tmp_path: Path) -> None: + """Retourne les sets présents sous forme de dossiers, vide si rien n'existe.""" + missing_root = tmp_path / "absent" + assert load_owned_set_ids(missing_root) == set() + + root = tmp_path / "collection" + root.mkdir() + (root / "75936").mkdir() + (root / "75944").mkdir() + assert load_owned_set_ids(root) == {"75936", "75944"} + + +def test_enrich_sets_adds_columns_and_collection(tmp_path: Path) -> None: + """Enrichit le CSV avec set_id, URL et possession.""" + source = tmp_path / "sets_filtered.csv" + destination = tmp_path / "sets_enriched.csv" + source.write_text( + "set_num,name,year,theme_id\n" + "75936-1,T. rex Rampage,2019,602\n" + "10757-1,Raptor Rescue Truck,2018,620\n" + ) + + enrich_sets(source, destination, {"75936"}) + + assert destination.read_text() == ( + "set_num,name,year,theme_id,set_id,rebrickable_url,in_collection\n" + "75936-1,T. rex Rampage,2019,602,75936,https://rebrickable.com/sets/75936-1,true\n" + "10757-1,Raptor Rescue Truck,2018,620,10757,https://rebrickable.com/sets/10757-1,false\n" + ) + + +def test_write_missing_sets_markdown(tmp_path: Path) -> None: + """Construit un tableau Markdown des sets non possédés.""" + enriched = tmp_path / "sets_enriched.csv" + markdown = tmp_path / "sets_missing.md" + enriched.write_text( + "set_num,name,year,theme_id,set_id,rebrickable_url,in_collection\n" + "75936-1,T. rex Rampage,2019,602,75936,https://rebrickable.com/sets/75936-1,true\n" + "10757-1,Raptor Rescue Truck,2018,620,10757,https://rebrickable.com/sets/10757-1,false\n" + ) + + write_missing_sets_markdown(enriched, markdown) + + assert markdown.read_text() == ( + "| set_id | year | name |\n" + "| --- | --- | --- |\n" + "| [10757](https://rebrickable.com/sets/10757-1) | 2018 | Raptor Rescue Truck |\n" + ) diff --git a/tests/test_filter_sets.py b/tests/test_filter_sets.py new file mode 100644 index 0000000..8e5d179 --- /dev/null +++ b/tests/test_filter_sets.py @@ -0,0 +1,37 @@ +"""Tests du filtrage des sets par thèmes.""" + +from pathlib import Path + +import pytest + +from lib.rebrickable.filter_sets import filter_sets_by_theme, parse_theme_ids + + +def test_parse_theme_ids_strips_and_validates() -> None: + """Nettoie la liste et refuse une valeur vide.""" + assert parse_theme_ids(" 274 , 602 ,620") == ["274", "602", "620"] + with pytest.raises(ValueError): + parse_theme_ids(" , , ") + + +def test_filter_sets_by_theme(tmp_path: Path) -> None: + """Conserve uniquement les sets des thèmes ciblés avec pièces et préserve l'entête.""" + source = tmp_path / "sets.csv" + destination = tmp_path / "filtered.csv" + overrides = tmp_path / "overrides.csv" + source.write_text( + "set_num,name,year,theme_id,num_parts,img_url\n" + "75936,T. rex Rampage,2019,602,3120,https://example\n" + "43221,100 Years of Disney Animation Icons,2023,710,0,https://example\n" + "75944,Indominus rex vs. Ankylosaurus,2020,602,1000,https://example\n" + "10757,Raptor Rescue Truck,2018,620,0,https://example\n" + ) + overrides.write_text("set_num,num_parts\n75936,3121\n") + + filter_sets_by_theme(source, destination, ["602"], overrides) + + assert destination.read_text() == ( + "set_num,name,year,theme_id,num_parts,img_url\n" + "75936,T. rex Rampage,2019,602,3121,https://example\n" + "75944,Indominus rex vs. Ankylosaurus,2020,602,1000,https://example\n" + ) diff --git a/tests/test_inventory_gaps.py b/tests/test_inventory_gaps.py new file mode 100644 index 0000000..4729d80 --- /dev/null +++ b/tests/test_inventory_gaps.py @@ -0,0 +1,144 @@ +"""Tests des écarts d'inventaire calculés depuis parts_filtered.csv.""" + +import csv +from pathlib import Path + +from lib.rebrickable.inventory_reconciliation import ( + compute_inventory_gaps, + index_sets_by_num, + write_inventory_gaps_csv, + write_inventory_gaps_markdown, +) + + +def write_csv(path: Path, headers: list[str], rows: list[list[str]]) -> None: + """Écrit un CSV simple pour les besoins des tests.""" + with path.open("w", newline="") as csv_file: + writer = csv.writer(csv_file) + writer.writerow(headers) + writer.writerows(rows) + + +def test_compute_inventory_gaps_excludes_spares(tmp_path: Path) -> None: + """Ignore les pièces de rechange et ne conserve que les sets en écart.""" + sets_path = tmp_path / "sets_enriched.csv" + parts_path = tmp_path / "parts_filtered.csv" + write_csv( + sets_path, + ["set_num", "set_id", "num_parts", "in_collection"], + [ + ["1000-1", "1000", "4", "true"], + ["2000-1", "2000", "3", "false"], + ["3000-1", "3000", "1", "true"], + ], + ) + write_csv( + parts_path, + ["part_num", "color_rgb", "is_translucent", "set_num", "set_id", "quantity_in_set", "is_spare"], + [ + ["A", "AAAAAA", "false", "1000-1", "1000", "2", "false"], + ["B", "BBBBBB", "false", "1000-1", "1000", "2", "false"], + ["S", "SSSSSS", "false", "1000-1", "1000", "5", "true"], + ["C", "CCCCCC", "false", "2000-1", "2000", "2", "false"], + ["D", "DDDDDD", "false", "3000-1", "3000", "1", "false"], + ], + ) + + gaps = compute_inventory_gaps(sets_path, parts_path) + + assert gaps == [ + { + "set_num": "1000-1", + "set_id": "1000", + "expected_parts": 4, + "inventory_parts": 9, + "inventory_parts_non_spare": 4, + "delta": 5, + "delta_non_spare": 0, + "in_collection": "true", + }, + { + "set_num": "2000-1", + "set_id": "2000", + "expected_parts": 3, + "inventory_parts": 2, + "inventory_parts_non_spare": 2, + "delta": 1, + "delta_non_spare": 1, + "in_collection": "false", + } + ] + + +def test_write_inventory_gaps_csv(tmp_path: Path) -> None: + """Sérialise le rapport d'écarts dans un CSV dédié.""" + destination_path = tmp_path / "inventory_gaps.csv" + rows = [ + { + "set_num": "2000-1", + "set_id": "2000", + "expected_parts": 3, + "inventory_parts": 2, + "inventory_parts_non_spare": 2, + "delta": 1, + "delta_non_spare": 1, + "in_collection": "false", + } + ] + + write_inventory_gaps_csv(destination_path, rows) + + with destination_path.open() as csv_file: + written_rows = list(csv.DictReader(csv_file)) + + assert written_rows == [ + { + "set_num": "2000-1", + "set_id": "2000", + "expected_parts": "3", + "inventory_parts": "2", + "inventory_parts_non_spare": "2", + "delta": "1", + "delta_non_spare": "1", + "in_collection": "false", + } + ] + + +def test_write_inventory_gaps_markdown(tmp_path: Path) -> None: + """Produit un tableau Markdown listant les sets en écart.""" + destination_path = tmp_path / "inventory_gaps.md" + gaps = [ + { + "set_num": "2000-1", + "set_id": "2000", + "expected_parts": 3, + "inventory_parts": 2, + "inventory_parts_non_spare": 2, + "delta": 1, + "delta_non_spare": 1, + "in_collection": "false", + } + ] + sets = [ + { + "set_num": "2000-1", + "set_id": "2000", + "num_parts": "3", + "name": "Test Set", + "year": "2020", + "rebrickable_url": "https://rebrickable.com/sets/2000-1", + "in_collection": "false", + } + ] + + write_inventory_gaps_markdown(destination_path, gaps, index_sets_by_num(sets)) + + with destination_path.open() as markdown_file: + content = markdown_file.read().splitlines() + + assert content[0].startswith("| set_id | name |") + assert ( + "| [2000](https://rebrickable.com/sets/2000-1) | Test Set | 2020 | 1 | 1 | 3 | 2 | 2 | false | [PDF](https://www.lego.com/service/buildinginstructions/2000) |" + in content + ) diff --git a/tests/test_milestones.py b/tests/test_milestones.py new file mode 100644 index 0000000..9ddaf9e --- /dev/null +++ b/tests/test_milestones.py @@ -0,0 +1,22 @@ +"""Tests du chargement des jalons configurables.""" + +from pathlib import Path + +from lib.milestones import load_milestones + + +def test_load_milestones_reads_csv(tmp_path: Path) -> None: + """Charge le CSV et convertit l'année en entier.""" + source = tmp_path / "milestones.csv" + source.write_text( + "year,description\n" + "1993,Sortie du film Jurassic Park\n" + "1997,Sortie du film The Lost World: Jurassic Park\n" + ) + + milestones = load_milestones(source) + + assert milestones == [ + {"year": 1993, "description": "Sortie du film Jurassic Park"}, + {"year": 1997, "description": "Sortie du film The Lost World: Jurassic Park"}, + ] diff --git a/tests/test_parts_inventory.py b/tests/test_parts_inventory.py new file mode 100644 index 0000000..0c91452 --- /dev/null +++ b/tests/test_parts_inventory.py @@ -0,0 +1,140 @@ +"""Tests de construction du fichier parts_filtered.csv.""" + +import csv +from pathlib import Path + +from lib.rebrickable.parts_inventory import write_parts_filtered + + +def write_csv(path: Path, headers: list[str], rows: list[list[str]]) -> None: + """Écrit un CSV simple pour les besoins de tests.""" + with path.open("w", newline="") as csv_file: + writer = csv.writer(csv_file) + writer.writerow(headers) + writer.writerows(rows) + + +def test_write_parts_filtered(tmp_path: Path) -> None: + """Assemble les pièces par set avec la dernière version d'inventaire.""" + sets_path = tmp_path / "sets_enriched.csv" + inventories_path = tmp_path / "inventories.csv" + inventory_parts_path = tmp_path / "inventory_parts.csv" + colors_path = tmp_path / "colors.csv" + inventory_minifigs_path = tmp_path / "inventory_minifigs.csv" + minifigs_path = tmp_path / "minifigs.csv" + destination_path = tmp_path / "parts_filtered.csv" + + write_csv( + sets_path, + ["set_num", "set_id", "name", "num_parts"], + [ + ["1234-1", "1234", "Sample Set A", "9"], + ["5678-1", "5678", "Sample Set B", "2"], + ], + ) + write_csv( + inventories_path, + ["id", "version", "set_num"], + [ + ["1", "1", "1234-1"], + ["2", "2", "1234-1"], + ["3", "1", "5678-1"], + ["4", "1", "fig-123"], + ], + ) + write_csv( + inventory_parts_path, + ["inventory_id", "part_num", "color_id", "quantity", "is_spare", "img_url"], + [ + ["2", "3001", "1", "4", "False", ""], + ["2", "3002", "2", "1", "True", ""], + ["3", "3003", "3", "2", "False", ""], + ["4", "mf-1", "2", "1", "False", ""], + ["4", "mf-2", "3", "2", "False", ""], + ], + ) + write_csv( + inventory_minifigs_path, + ["inventory_id", "fig_num", "quantity"], + [ + ["2", "fig-123", "1"], + ], + ) + write_csv( + minifigs_path, + ["fig_num", "name", "num_parts", "img_url"], + [ + ["fig-123", "Sample Minifig", "2", ""], + ], + ) + write_csv( + colors_path, + ["id", "name", "rgb", "is_trans", "num_parts", "num_sets", "y1", "y2"], + [ + ["1", "White", "FFFFFF", "False", "0", "0", "0", "0"], + ["2", "Black", "000000", "True", "0", "0", "0", "0"], + ["3", "Red", "FF0000", "False", "0", "0", "0", "0"], + ], + ) + + write_parts_filtered( + sets_path, + inventories_path, + inventory_parts_path, + colors_path, + inventory_minifigs_path, + minifigs_path, + destination_path, + ) + + with destination_path.open() as result_file: + reader = csv.DictReader(result_file) + rows = list(reader) + + assert rows == [ + { + "part_num": "3001", + "color_rgb": "FFFFFF", + "is_translucent": "false", + "set_num": "1234-1", + "set_id": "1234", + "quantity_in_set": "4", + "is_spare": "false", + }, + { + "part_num": "3002", + "color_rgb": "000000", + "is_translucent": "true", + "set_num": "1234-1", + "set_id": "1234", + "quantity_in_set": "1", + "is_spare": "true", + }, + { + "part_num": "mf-1", + "color_rgb": "000000", + "is_translucent": "true", + "set_num": "1234-1", + "set_id": "1234", + "quantity_in_set": "1", + "is_spare": "false", + }, + { + "part_num": "mf-2", + "color_rgb": "FF0000", + "is_translucent": "false", + "set_num": "1234-1", + "set_id": "1234", + "quantity_in_set": "2", + "is_spare": "false", + }, + { + "part_num": "3003", + "color_rgb": "FF0000", + "is_translucent": "false", + "set_num": "5678-1", + "set_id": "5678", + "quantity_in_set": "2", + "is_spare": "false", + }, + ] diff --git a/tests/test_parts_per_set_plot.py b/tests/test_parts_per_set_plot.py new file mode 100644 index 0000000..f53a551 --- /dev/null +++ b/tests/test_parts_per_set_plot.py @@ -0,0 +1,54 @@ +"""Tests des graphiques sur la moyenne de pièces par set.""" + +from pathlib import Path + +import matplotlib + +from lib.plots.parts_per_set import ( + compute_average_parts_per_set, + compute_rolling_mean, + plot_parts_per_set, +) + + +matplotlib.use("Agg") + + +def test_compute_average_parts_per_set() -> None: + """Calcule la moyenne annuelle pièces/set.""" + rows = [ + {"year": "2020", "num_parts": "100"}, + {"year": "2020", "num_parts": "200"}, + {"year": "2021", "num_parts": "150"}, + ] + + series = compute_average_parts_per_set(rows) + + assert series == [(2020, 150.0), (2021, 150.0)] + + +def test_compute_rolling_mean() -> None: + """Calcule une moyenne glissante 2 ans.""" + series = [(2020, 100.0), (2021, 200.0), (2022, 300.0)] + + rolling = compute_rolling_mean(series, 2) + + assert rolling == [(2020, 0.0), (2021, 150.0), (2022, 250.0)] + + +def test_plot_parts_per_set_creates_figure(tmp_path: Path) -> None: + """Génère le fichier image avec moyennes annuelle et glissante.""" + enriched = tmp_path / "sets_enriched.csv" + milestones = tmp_path / "milestones.csv" + destination = tmp_path / "figures" / "step07" / "avg_parts_per_set.png" + enriched.write_text( + "set_num,name,year,theme_id,num_parts,img_url,set_id,rebrickable_url,in_collection\n" + "75936-1,T. rex Rampage,2019,602,3120,https://example,75936,https://example,true\n" + "75944-1,Indominus rex vs. Ankylosaurus,2020,602,1000,https://example,75944,https://example,false\n" + ) + milestones.write_text("year,description\n2019,LEGO Jurassic World: Legend of Isla Nublar\n") + + plot_parts_per_set(enriched, milestones, destination) + + assert destination.exists() + assert destination.stat().st_size > 0 diff --git a/tests/test_parts_stats.py b/tests/test_parts_stats.py new file mode 100644 index 0000000..102fdcb --- /dev/null +++ b/tests/test_parts_stats.py @@ -0,0 +1,85 @@ +"""Tests des statistiques simples sur les pièces filtrées.""" + +import csv +from pathlib import Path + +from lib.rebrickable.parts_stats import build_stats, read_rows, write_parts_stats + + +def write_csv(path: Path, headers: list[str], rows: list[list[str]]) -> None: + """Écrit un CSV simple pour les besoins de tests.""" + with path.open("w", newline="") as csv_file: + writer = csv.writer(csv_file) + writer.writerow(headers) + writer.writerows(rows) + + +def test_build_stats(tmp_path: Path) -> None: + """Calcule les statistiques principales sans les pièces de rechange.""" + parts_path = tmp_path / "parts_filtered.csv" + sets_path = tmp_path / "sets_enriched.csv" + stats_path = tmp_path / "stats.csv" + write_csv( + parts_path, + ["part_num", "color_rgb", "is_translucent", "set_num", "set_id", "quantity_in_set", "is_spare"], + [ + ["3001", "FFFFFF", "false", "1000-1", "1000", "2", "false"], + ["3001", "FFFFFF", "false", "2000-1", "2000", "1", "false"], + ["3002", "000000", "true", "1000-1", "1000", "5", "false"], + ["3003", "FF0000", "false", "1000-1", "1000", "1", "true"], + ], + ) + write_csv( + sets_path, + ["set_num", "set_id", "num_parts", "in_collection"], + [ + ["1000-1", "1000", "8", "true"], + ["2000-1", "2000", "1", "false"], + ], + ) + write_csv( + stats_path, + ["libelle", "valeur"], + [ + ["Total de pièces pour les thèmes filtrés", "9"], + ], + ) + + stats = build_stats(read_rows(parts_path), sets_path, parts_path, stats_path) + + assert stats == [ + ("Total de variations de pièces (hors rechanges)", "2"), + ( + "Pièce la moins utilisée (référence + couleur)", + "3001 / FFFFFF / false (3)", + ), + ( + "Pièce la plus commune (référence + couleur)", + "3002 / 000000 / true (5)", + ), + ("Total de couleurs utilisées (hors rechanges)", "2"), + ("Total de pièces hors rechanges", "8"), + ("Ecart total catalogue (stats) - inventaire (hors rechanges)", "1"), + ("Nombre de sets en écart inventaire/catalogue", "0"), + ("Ecart maximal inventaire/catalogue", "none (0)"), + ] + + +def test_write_parts_stats(tmp_path: Path) -> None: + """Écrit un CSV de statistiques.""" + destination_path = tmp_path / "parts_stats.csv" + stats = [ + ("A", "1"), + ("B", "2"), + ] + + write_parts_stats(destination_path, stats) + + with destination_path.open() as csv_file: + rows = list(csv.reader(csv_file)) + + assert rows == [ + ["libelle", "valeur"], + ["A", "1"], + ["B", "2"], + ] diff --git a/tests/test_sets_per_year_plot.py b/tests/test_sets_per_year_plot.py new file mode 100644 index 0000000..9b22d1a --- /dev/null +++ b/tests/test_sets_per_year_plot.py @@ -0,0 +1,60 @@ +"""Tests du graphique des sets par année.""" + +import matplotlib +from pathlib import Path + +from lib.plots.sets_per_year import ( + compute_parts_per_year, + compute_sets_per_year, + plot_sets_per_year, +) + + +matplotlib.use("Agg") + + +def test_compute_sets_per_year_counts_and_sorts() -> None: + """Compte les sets par année et renvoie une liste triée.""" + rows = [ + {"year": "2020"}, + {"year": "2019"}, + {"year": "2020"}, + ] + + series = compute_sets_per_year(rows) + + assert series == [(2019, 1), (2020, 2)] + + +def test_compute_parts_per_year_sums_and_sorts() -> None: + """Somme les pièces par année et renvoie une liste triée.""" + rows = [ + {"year": "2020", "num_parts": "10"}, + {"year": "2019", "num_parts": "5"}, + {"year": "2020", "num_parts": "1"}, + ] + + series = compute_parts_per_year(rows) + + assert series == [(2019, 5), (2020, 11)] + + +def test_plot_sets_per_year_creates_figure(tmp_path: Path) -> None: + """Génère un fichier image avec les jalons fournis.""" + enriched = tmp_path / "sets_enriched.csv" + milestones = tmp_path / "milestones.csv" + destination = tmp_path / "figures" / "step07" / "sets_per_year.png" + enriched.write_text( + "set_num,name,year,theme_id,num_parts,img_url,set_id,rebrickable_url,in_collection\n" + "75936-1,T. rex Rampage,2019,602,3120,https://example,75936,https://example,true\n" + "75944-1,Indominus rex vs. Ankylosaurus,2020,602,1000,https://example,75944,https://example,false\n" + ) + milestones.write_text( + "year,description\n" + "2019,Diffusion LEGO Jurassic World: Legend of Isla Nublar\n" + ) + + plot_sets_per_year(enriched, milestones, destination) + + assert destination.exists() + assert destination.stat().st_size > 0 diff --git a/tests/test_stats.py b/tests/test_stats.py new file mode 100644 index 0000000..ff15240 --- /dev/null +++ b/tests/test_stats.py @@ -0,0 +1,83 @@ +"""Tests des statistiques calculées sur les sets LEGO filtrés.""" + +from lib.rebrickable.stats import compute_basic_stats, write_stats_csv + + +def test_compute_basic_stats_returns_expected_values(tmp_path) -> None: + """Calcule les statistiques principales sur un échantillon maîtrisé.""" + themes = [ + {"id": "602", "name": "Jurassic World", "parent_id": ""}, + {"id": "274", "name": "Jurassic Park III", "parent_id": "273"}, + ] + all_sets = [ + {"set_num": "123-1", "name": "A", "year": "2020", "theme_id": "602", "num_parts": "100", "img_url": ""}, + {"set_num": "124-1", "name": "B", "year": "2021", "theme_id": "602", "num_parts": "200", "img_url": ""}, + {"set_num": "125-1", "name": "C", "year": "2021", "theme_id": "274", "num_parts": "300", "img_url": ""}, + ] + filtered_sets = [ + {"set_num": "123-1", "name": "A", "year": "2020", "theme_id": "602", "num_parts": "100", "img_url": ""}, + {"set_num": "124-1", "name": "B", "year": "2021", "theme_id": "602", "num_parts": "200", "img_url": ""}, + ] + enriched_sets = [ + { + "set_num": "123-1", + "name": "A", + "year": "2020", + "theme_id": "602", + "num_parts": "100", + "img_url": "", + "set_id": "123", + "rebrickable_url": "", + "in_collection": "true", + }, + { + "set_num": "124-1", + "name": "B", + "year": "2021", + "theme_id": "602", + "num_parts": "200", + "img_url": "", + "set_id": "124", + "rebrickable_url": "", + "in_collection": "false", + }, + ] + + stats = compute_basic_stats(themes, all_sets, filtered_sets, enriched_sets) + + assert stats == [ + ("Nombre total de sets (catalogue complet)", "3"), + ("Nombre total de thèmes (catalogue complet)", "2"), + ("Nombre de sets après filtrage (thèmes ciblés)", "2"), + ("Nombre moyen de sets par thème (catalogue complet)", "1.50"), + ("Pourcentage des sets filtrés vs total", "66.67%"), + ("Taux de possession (thèmes filtrés)", "50.00%"), + ("Sets dans la collection", "1"), + ("Sets manquants pour la collection", "1"), + ("Nombre moyen de pièces par set (thèmes filtrés)", "150.00"), + ("Médiane de pièces par set (thèmes filtrés)", "150.00"), + ("Nombre moyen de sets commercialisés par an (thèmes filtrés)", "1.00"), + ("Total de pièces pour les thèmes filtrés", "300"), + ("Total de pièces des sets possédés", "100"), + ("Pourcentage de pièces possédées (thèmes filtrés)", "33.33%"), + ("Nombre de thèmes filtrés", "1"), + ("Première année de sortie (thèmes filtrés)", "2020"), + ("Dernière année de sortie (thèmes filtrés)", "2021"), + ("Année la plus prolifique (thèmes filtrés)", "2020 (1 sets)"), + ("Set avec le plus de pièces (thèmes filtrés)", "124-1 - B (200 pièces)"), + ("Set avec le moins de pièces (thèmes filtrés)", "123-1 - A (100 pièces)"), + ("Set le plus ancien (thèmes filtrés)", "123-1 - A (2020)"), + ("Set le plus récent (thèmes filtrés)", "124-1 - B (2021)"), + ("Nombre moyen de pièces des sets possédés", "100.00"), + ("Nombre moyen de pièces des sets manquants", "200.00"), + ] + + +def test_write_stats_csv_outputs_two_columns(tmp_path) -> None: + """Écrit un CSV simple avec libellé et valeur.""" + destination = tmp_path / "stats.csv" + stats = [("A", "1"), ("B", "2")] + + write_stats_csv(destination, stats) + + assert destination.read_text() == "libelle,valeur\nA,1\nB,2\n"