1

Premiers éléments de l'étude

This commit is contained in:
2025-12-01 21:57:05 +01:00
commit 22b4dae0ba
46 changed files with 2595 additions and 0 deletions

1
lib/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Fonctions de support pour l'étude des sets LEGO."""

59
lib/color_sort.py Normal file
View File

@@ -0,0 +1,59 @@
"""Outils de tri de couleurs dans un espace perceptuel."""
import math
from typing import Iterable, List, Tuple
import numpy as np
from colorspacious import cspace_convert
def hex_to_rgb_unit(hex_value: str) -> np.ndarray:
"""Convertit un code hexadécimal en tableau RGB normalisé (0-1)."""
return np.array([int(hex_value[index : index + 2], 16) / 255 for index in (0, 2, 4)], dtype=float)
def lab_components(hex_value: str) -> Tuple[float, float, float, float, float]:
"""Retourne (hue_angle, chroma, lightness, a*, b*) pour une couleur."""
l_component, a_component, b_component = cspace_convert(hex_to_rgb_unit(hex_value), "sRGB1", "CIELab")
hue_angle = math.atan2(b_component, a_component)
chroma = math.hypot(a_component, b_component)
return hue_angle, chroma, l_component, a_component, b_component
def sort_hex_colors_lab(
hex_values: Iterable[str],
hue_offset_degrees: float = 60.0,
neutral_threshold: float = 3.0,
) -> List[str]:
"""
Trie des couleurs par teinte perceptuelle, puis chroma et luminosité.
- Les couleurs quasi neutres (chroma < seuil) sont déplacées en fin de liste, triées par luminosité.
- Le cercle chromatique peut être décalé via hue_offset_degrees (par défaut 60° pour démarrer vers le jaune).
"""
offset_radians = math.radians(hue_offset_degrees)
chromatic: List[Tuple[float, float, float, str]] = []
neutrals: List[Tuple[float, str]] = []
for hex_value in hex_values:
hue_angle, chroma, lightness, _, _ = lab_components(hex_value)
if chroma < neutral_threshold:
neutrals.append((lightness, hex_value))
continue
hue = hue_angle + offset_radians
if hue < 0:
hue += 2 * math.pi
chromatic.append((hue, -chroma, lightness, hex_value))
chromatic.sort()
neutrals.sort()
return [item[3] for item in chromatic] + [item[1] for item in neutrals]
def lab_sort_key(hex_value: str, hue_offset_degrees: float = 60.0, neutral_threshold: float = 3.0) -> Tuple[int, float, float, float]:
"""Clé de tri unique (bucket chromatique/neutre) pour un usage ponctuel."""
hue_angle, chroma, lightness, _, _ = lab_components(hex_value)
if chroma < neutral_threshold:
return (1, 0.0, lightness, chroma)
hue = hue_angle + math.radians(hue_offset_degrees)
if hue < 0:
hue += 2 * math.pi
return (0, hue, -chroma, lightness)

8
lib/filesystem.py Normal file
View File

@@ -0,0 +1,8 @@
"""Fonctions utilitaires pour manipuler le système de fichiers."""
from pathlib import Path
def ensure_parent_dir(target_path: Path) -> None:
"""Crée le répertoire parent d'un chemin de fichier s'il est absent."""
target_path.parent.mkdir(parents=True, exist_ok=True)

15
lib/milestones.py Normal file
View File

@@ -0,0 +1,15 @@
"""Chargement des jalons (milestones) thématiques configurables."""
import csv
from pathlib import Path
from typing import List
def load_milestones(path: Path) -> List[dict]:
"""Charge la liste des jalons depuis un fichier CSV à deux colonnes (year, description)."""
milestones = []
with path.open() as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
milestones.append({"year": int(row["year"]), "description": row["description"]})
return milestones

1
lib/plots/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Utilitaires de visualisation des données LEGO."""

174
lib/plots/colors_grid.py Normal file
View File

@@ -0,0 +1,174 @@
"""Visualisation des couleurs utilisées dans l'inventaire filtré."""
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.lines import Line2D
from lib.filesystem import ensure_parent_dir
from lib.color_sort import lab_sort_key, sort_hex_colors_lab
from lib.rebrickable.parts_inventory import normalize_boolean
from lib.rebrickable.stats import read_rows
def sort_colors_perceptually(colors: Iterable[dict]) -> List[dict]:
"""Trie les couleurs via l'espace Lab (teinte perçue, chroma, luminosité)."""
ordered_hex = sort_hex_colors_lab(color["color_rgb"] for color in colors)
index_map = {hex_value: index for index, hex_value in enumerate(ordered_hex)}
return sorted(colors, key=lambda color: index_map[color["color_rgb"]])
def load_used_colors(parts_path: Path, colors_path: Path, minifig_only: bool = False) -> List[dict]:
"""Charge les couleurs utilisées (hors rechanges) et leurs quantités totales.
Si minifig_only est vrai, ne conserve que les pièces marquées is_minifig_part=true.
Sinon, exclut les pièces de minifig.
"""
rows = read_rows(parts_path)
colors_lookup = {(row["rgb"], normalize_boolean(row["is_trans"])): row["name"] for row in read_rows(colors_path)}
totals: Dict[Tuple[str, str], int] = {}
for row in rows:
if minifig_only and row.get("is_minifig_part") != "true":
continue
if not minifig_only and row.get("is_minifig_part") == "true":
continue
key = (row["color_rgb"], row["is_translucent"])
totals[key] = totals.get(key, 0) + int(row["quantity_in_set"])
used_colors = []
for (color_rgb, is_translucent), quantity in totals.items():
used_colors.append(
{
"color_rgb": color_rgb,
"is_translucent": is_translucent,
"name": colors_lookup.get((color_rgb, is_translucent), color_rgb),
"quantity": quantity,
}
)
return sort_colors_perceptually(used_colors)
def build_hex_positions(count: int, columns: int = 9, spacing: float = 1.1) -> List[Tuple[float, float]]:
"""Construit des positions hexagonales pour une mise en page aérée."""
positions: List[Tuple[float, float]] = []
rows = (count + columns - 1) // columns
vertical_gap = spacing * 0.85
for row in range(rows):
offset = 0.0 if row % 2 == 0 else spacing / 2
for col in range(columns):
index = row * columns + col
if index >= count:
return positions
x = col * spacing + offset
y = -row * vertical_gap
positions.append((x, y))
return positions
def build_background(width: float, height: float, resolution: int = 600) -> np.ndarray:
"""Génère un fond dégradé pour mettre en valeur les couleurs translucides."""
x = np.linspace(-1.0, 1.0, resolution)
y = np.linspace(-1.0, 1.0, resolution)
xv, yv = np.meshgrid(x, y)
radial = np.sqrt(xv**2 + yv**2)
diagonal = (xv + yv) / 2
layer = 0.35 + 0.35 * (1 - radial) + 0.2 * diagonal
layer = np.clip(layer, 0.05, 0.95)
background = np.dstack((layer * 0.9, layer * 0.92, layer))
return background
def plot_colors_grid(
parts_path: Path,
colors_path: Path,
destination_path: Path,
minifig_only: bool = False,
) -> None:
"""Dessine une grille artistique des couleurs utilisées."""
colors = load_used_colors(parts_path, colors_path, minifig_only=minifig_only)
positions = build_hex_positions(len(colors))
x_values = [x for x, _ in positions]
y_values = [y for _, y in positions]
width = max(x_values) - min(x_values) + 1.5
height = max(y_values) - min(y_values) + 1.5
fig, ax = plt.subplots(figsize=(10, 10), facecolor="#0b0c10")
background = build_background(width, height)
ax.imshow(
background,
extent=[min(x_values) - 0.75, min(x_values) - 0.75 + width, min(y_values) - 0.75, min(y_values) - 0.75 + height],
origin="lower",
zorder=0,
)
max_quantity = max(color["quantity"] for color in colors)
min_marker = 720
max_marker = 1600
for (x, y), color in zip(positions, colors):
is_translucent = color["is_translucent"] == "true"
alpha = 0.65 if is_translucent else 1.0
edge = "#f7f7f7" if is_translucent else "#0d0d0d"
size = min_marker + (max_marker - min_marker) * (color["quantity"] / max_quantity)
if is_translucent:
ax.scatter(
x,
y,
s=size * 1.25,
c="#ffffff",
alpha=0.18,
edgecolors="none",
linewidths=0,
zorder=2,
)
ax.scatter(
x,
y,
s=size,
c=f"#{color['color_rgb']}",
alpha=alpha,
edgecolors=edge,
linewidths=1.1,
zorder=3,
)
legend_handles = [
Line2D([0], [0], marker="o", color="none", markerfacecolor="#cccccc", markeredgecolor="#0d0d0d", markersize=10, label="Opaque"),
Line2D(
[0],
[0],
marker="o",
color="none",
markerfacecolor="#cccccc",
markeredgecolor="#f7f7f7",
markersize=10,
alpha=0.65,
label="Translucide",
),
]
legend_y = 1.06 if not minifig_only else 1.08
ax.legend(
handles=legend_handles,
loc="upper center",
bbox_to_anchor=(0.5, legend_y),
ncol=2,
frameon=False,
labelcolor="#f0f0f0",
)
title_prefix = "Palette des couleurs utilisées (rechanges incluses)"
if minifig_only:
title_prefix = "Palette des couleurs de minifigs (rechanges incluses)"
ax.set_title(title_prefix, fontsize=14, color="#f0f0f0", pad=28)
ax.set_xticks([])
ax.set_yticks([])
ax.set_xlim(min(x_values) - 1.0, max(x_values) + 1.0)
ax.set_ylim(min(y_values) - 1.0, max(y_values) + 1.0)
for spine in ax.spines.values():
spine.set_visible(False)
ensure_parent_dir(destination_path)
fig.tight_layout()
fig.savefig(destination_path, dpi=200)
plt.close(fig)

110
lib/plots/parts_per_set.py Normal file
View File

@@ -0,0 +1,110 @@
"""Graphiques sur la taille moyenne des sets (pièces par set)."""
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
import matplotlib.pyplot as plt
from lib.filesystem import ensure_parent_dir
from lib.milestones import load_milestones
from lib.rebrickable.stats import read_rows
def compute_average_parts_per_set(rows: Iterable[dict]) -> List[Tuple[int, float]]:
"""Calcule la moyenne annuelle de pièces par set."""
per_year: Dict[int, Dict[str, int]] = {}
for row in rows:
year = int(row["year"])
per_year[year] = per_year.get(year, {"parts": 0, "sets": 0})
per_year[year]["parts"] += int(row["num_parts"])
per_year[year]["sets"] += 1
results: List[Tuple[int, float]] = []
for year in sorted(per_year):
totals = per_year[year]
results.append((year, totals["parts"] / totals["sets"]))
return results
def compute_rolling_mean(series: List[Tuple[int, float]], window: int) -> List[Tuple[int, float]]:
"""Calcule la moyenne glissante sur une fenêtre donnée."""
values = [value for _, value in series]
years = [year for year, _ in series]
rolling: List[Tuple[int, float]] = []
for index in range(len(values)):
if index + 1 < window:
rolling.append((years[index], 0.0))
else:
window_values = values[index - window + 1 : index + 1]
rolling.append((years[index], sum(window_values) / window))
return rolling
def plot_parts_per_set(
enriched_sets_path: Path,
milestones_path: Path,
destination_path: Path,
rolling_window: int = 3,
) -> None:
"""Génère un graphique de la moyenne annuelle et glissante des pièces par set."""
sets_rows = read_rows(enriched_sets_path)
milestones = load_milestones(milestones_path)
annual_series = compute_average_parts_per_set(sets_rows)
rolling_series = compute_rolling_mean(annual_series, rolling_window)
years = [year for year, _ in annual_series]
annual_values = [value for _, value in annual_series]
rolling_values = [value for _, value in rolling_series]
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(years, annual_values, marker="o", color="#2ca02c", label="Moyenne annuelle (pièces/set)")
ax.plot(
years,
rolling_values,
marker="^",
color="#9467bd",
label=f"Moyenne glissante {rolling_window} ans (pièces/set)",
)
ax.set_xlabel("Année")
ax.set_ylabel("Pièces par set")
ax.set_title("Évolution de la taille moyenne des sets (thèmes filtrés)")
ax.grid(True, linestyle="--", alpha=0.3)
ax.set_xlim(min(years) - 0.4, max(years) + 0.4)
ax.set_xticks(list(range(min(years), max(years) + 1)))
ax.tick_params(axis="x", labelrotation=45)
peak = max(max(annual_values), max(rolling_values))
top_limit = peak * 2
milestones_in_range = sorted(
[m for m in milestones if min(years) <= m["year"] <= max(years)],
key=lambda m: (m["year"], m["description"]),
)
milestone_offsets: Dict[int, int] = {}
offset_step = 0.4
max_offset = 0
for milestone in milestones_in_range:
year = milestone["year"]
count_for_year = milestone_offsets.get(year, 0)
milestone_offsets[year] = count_for_year + 1
horizontal_offset = offset_step * (count_for_year // 2 + 1)
max_offset = max(max_offset, count_for_year)
if count_for_year % 2 == 1:
horizontal_offset *= -1
text_x = year + horizontal_offset
ax.axvline(year, color="#d62728", linestyle="--", linewidth=1, alpha=0.65)
ax.text(
text_x,
top_limit,
milestone["description"],
rotation=90,
verticalalignment="top",
horizontalalignment="center",
fontsize=8,
color="#d62728",
)
ax.set_ylim(0, top_limit * (1 + max_offset * 0.02))
ax.legend(loc="upper left", bbox_to_anchor=(1.12, 1))
ensure_parent_dir(destination_path)
fig.tight_layout()
fig.savefig(destination_path, dpi=150)
plt.close(fig)

196
lib/plots/sets_per_year.py Normal file
View File

@@ -0,0 +1,196 @@
"""Graphiques montrant le nombre de sets sortis par année."""
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
import matplotlib.pyplot as plt
from lib.filesystem import ensure_parent_dir
from lib.milestones import load_milestones
from lib.rebrickable.stats import read_rows
def compute_sets_per_year(rows: Iterable[dict]) -> List[Tuple[int, int]]:
"""Retourne la liste (année, nombre de sets) triée chronologiquement."""
counts: Dict[int, int] = {}
for row in rows:
year = int(row["year"])
counts[year] = counts.get(year, 0) + 1
return sorted(counts.items(), key=lambda item: item[0])
def compute_parts_per_year(rows: Iterable[dict]) -> List[Tuple[int, int]]:
"""Retourne la liste (année, total de pièces) triée chronologiquement."""
totals: Dict[int, int] = {}
for row in rows:
year = int(row["year"])
totals[year] = totals.get(year, 0) + int(row["num_parts"])
return sorted(totals.items(), key=lambda item: item[0])
def plot_sets_per_year(
enriched_sets_path: Path,
milestones_path: Path,
destination_path: Path,
) -> None:
"""Génère un histogramme annuel avec la moyenne cumulative et les jalons."""
sets_rows = read_rows(enriched_sets_path)
milestones = load_milestones(milestones_path)
raw_series = compute_sets_per_year(sets_rows)
raw_parts_series = compute_parts_per_year(sets_rows)
min_year = min(year for year, _ in raw_series)
max_year = max(year for year, _ in raw_series)
series = [(year, dict(raw_series).get(year, 0)) for year in range(min_year, max_year + 1)]
parts_series = [(year, dict(raw_parts_series).get(year, 0)) for year in range(min_year, max_year + 1)]
years = [year for year, _ in series]
counts = [count for _, count in series]
parts_totals = [total for _, total in parts_series]
owned_counts_map: Dict[int, int] = {}
owned_parts_map: Dict[int, int] = {}
for row in sets_rows:
year = int(row["year"])
if row["in_collection"] == "true":
owned_counts_map[year] = owned_counts_map.get(year, 0) + 1
owned_parts_map[year] = owned_parts_map.get(year, 0) + int(row["num_parts"])
owned_counts = [owned_counts_map.get(year, 0) for year in years]
missing_counts = [total - owned for total, owned in zip(counts, owned_counts)]
owned_parts = [owned_parts_map.get(year, 0) for year in years]
missing_parts = [total - owned for total, owned in zip(parts_totals, owned_parts)]
first_non_zero_index = next(index for index, value in enumerate(counts) if value > 0)
cumulative_mean = []
total = 0
for index, count in enumerate(counts):
total += count
cumulative_mean.append(total / (index + 1))
cumulative_parts_mean = []
rolling_sets = 0
rolling_parts = 0
for index, (count, parts) in enumerate(zip(counts, parts_totals)):
rolling_sets += count
rolling_parts += parts
if index < first_non_zero_index:
cumulative_parts_mean.append(0)
else:
cumulative_parts_mean.append(rolling_parts / rolling_sets)
milestones_in_range = sorted(
[m for m in milestones if min_year <= m["year"] <= max_year],
key=lambda m: (m["year"], m["description"]),
)
fig, ax = plt.subplots(figsize=(14, 6))
bar_width = 0.35
x_sets = [year - bar_width / 2 for year in years]
bars_owned_sets = ax.bar(
x_sets,
owned_counts,
width=bar_width,
color="#1f77b4",
alpha=0.9,
label="Sets possédés",
zorder=2,
)
bars_missing_sets = ax.bar(
x_sets,
missing_counts,
width=bar_width,
bottom=owned_counts,
color="#9ecae1",
alpha=0.8,
label="Sets non possédés",
)
set_mean_line = ax.plot(
years,
cumulative_mean,
color="#ff7f0e",
marker="o",
label="Moyenne cumulative (sets)",
zorder=5,
)
ax2 = ax.twinx()
x_parts = [year + bar_width / 2 for year in years]
parts_bars_owned = ax2.bar(
x_parts,
owned_parts,
width=bar_width,
color="#2ca02c",
alpha=0.9,
label="Pièces (sets possédés)",
zorder=2,
)
parts_bars_missing = ax2.bar(
x_parts,
missing_parts,
width=bar_width,
bottom=owned_parts,
color="#c7e9c0",
alpha=0.85,
label="Pièces (sets non possédés)",
)
parts_mean_line = ax2.plot(
years,
cumulative_parts_mean,
color="#9467bd",
marker="^",
label="Moyenne cumulative (pièces/set)",
zorder=6,
)
parts_peak = max(parts_totals + [1])
ax2.set_ylim(0, parts_peak * 1.1)
ax.set_xlabel("Année")
ax.set_ylabel("Nombre de sets")
ax2.set_ylabel("Nombre de pièces")
ax.set_title("Nombre de sets par année (thèmes filtrés)")
ax.grid(True, linestyle="--", alpha=0.3)
ax.set_xlim(min_year - 1, max_year + 0.4)
ax.set_xticks(list(range(min_year, max_year + 1)))
ax.tick_params(axis="x", labelrotation=45)
peak = max(max(counts), max(cumulative_mean))
top_limit = peak * 2
milestone_offsets: Dict[int, int] = {}
offset_step = 0.3
max_offset = 0
for milestone in milestones_in_range:
year = milestone["year"]
count_for_year = milestone_offsets.get(year, 0)
milestone_offsets[year] = count_for_year + 1
max_offset = max(max_offset, count_for_year)
horizontal_offset = offset_step * (count_for_year // 2 + 1)
if count_for_year % 2 == 1:
horizontal_offset *= -1
text_x = year + horizontal_offset
ax.axvline(year, color="#d62728", linestyle="--", linewidth=1, alpha=0.65)
ax.text(
text_x,
top_limit,
milestone["description"],
rotation=90,
verticalalignment="top",
horizontalalignment="center",
fontsize=8,
color="#d62728",
)
ax.set_ylim(0, top_limit * (1 + max_offset * 0.02))
handles = [
bars_owned_sets,
bars_missing_sets,
parts_bars_owned,
parts_bars_missing,
set_mean_line[0],
parts_mean_line[0],
]
labels = [
"Sets possédés",
"Sets non possédés",
"Pièces (sets possédés)",
"Pièces (sets non possédés)",
"Moyenne cumulative (sets)",
"Moyenne cumulative (pièces/set)",
]
ax.legend(handles, labels, loc="upper left", bbox_to_anchor=(1.12, 1))
ensure_parent_dir(destination_path)
fig.tight_layout()
fig.savefig(destination_path, dpi=150)
plt.close(fig)

View File

@@ -0,0 +1 @@
"""Fonctionnalités liées aux données Rebrickable."""

View File

@@ -0,0 +1,47 @@
"""Outils de téléchargement pour les fichiers fournis par Rebrickable."""
from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterable, List
import gzip
import shutil
import requests
REBRICKABLE_BASE_URL = "https://cdn.rebrickable.com/media/downloads/"
CHUNK_SIZE = 8192
CACHE_TTL = 7
def build_rebrickable_url(file_name: str) -> str:
"""Construit l'URL complète d'un fichier Rebrickable à partir de son nom."""
return f"{REBRICKABLE_BASE_URL}{file_name}"
def download_rebrickable_file(file_name: str, destination_dir: Path) -> Path:
"""Télécharge un fichier Rebrickable, le décompresse et supprime l'archive."""
target_path = destination_dir / file_name
destination_dir.mkdir(parents=True, exist_ok=True)
decompressed_path = target_path.with_suffix("")
if decompressed_path.exists():
cache_age = datetime.now() - datetime.fromtimestamp(decompressed_path.stat().st_mtime)
if cache_age <= timedelta(days=CACHE_TTL):
if target_path.exists():
target_path.unlink()
return decompressed_path
response = requests.get(build_rebrickable_url(file_name), stream=True)
response.raise_for_status()
with target_path.open("wb") as target_file:
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
target_file.write(chunk)
with gzip.open(target_path, "rb") as compressed_file:
with decompressed_path.open("wb") as decompressed_file:
shutil.copyfileobj(compressed_file, decompressed_file)
target_path.unlink()
return decompressed_path
def download_rebrickable_files(file_names: Iterable[str], destination_dir: Path) -> List[Path]:
"""Télécharge en série plusieurs fichiers compressés fournis par Rebrickable."""
return [download_rebrickable_file(file_name, destination_dir) for file_name in file_names]

View File

@@ -0,0 +1,86 @@
"""Enrichissement des sets LEGO avec des métadonnées Rebrickable et personnelles."""
import csv
from pathlib import Path
from typing import Iterable, Set
from lib.filesystem import ensure_parent_dir
REBRICKABLE_SET_BASE_URL = "https://rebrickable.com/sets/"
def extract_set_id(set_num: str) -> str:
"""Extrait l'identifiant LEGO (partie avant la révision) depuis set_num."""
return set_num.split("-", 1)[0]
def build_rebrickable_set_url(set_num: str) -> str:
"""Construit l'URL publique Rebrickable d'un set."""
return f"{REBRICKABLE_SET_BASE_URL}{set_num}"
def parse_set_collection_root(raw_value: str) -> Path | None:
"""Prépare le chemin de collection, ou None si aucune collection n'est fournie."""
cleaned = raw_value.strip()
if not cleaned:
print("La variable MY_SETS est vide, aucun set en collection.")
return None
return Path(cleaned)
def load_owned_set_ids(collection_root: Path) -> Set[str]:
"""Retourne l'ensemble des identifiants de sets présents dans un dossier de collection."""
if not collection_root.exists():
print(f"Le dossier {collection_root} n'existe pas, aucun set en collection.")
return set()
if not collection_root.is_dir():
print(f"Le chemin {collection_root} n'est pas un dossier, aucun set en collection.")
return set()
entries = [path for path in collection_root.iterdir() if path.is_dir()]
if not entries:
print(f"Le dossier {collection_root} est vide, aucun set en collection.")
return set()
return {entry.name for entry in entries}
def enrich_sets(
source_path: Path,
destination_path: Path,
owned_set_ids: Iterable[str],
) -> None:
"""Ajoute les colonnes set_id, rebrickable_url et in_collection au catalogue filtré."""
ensure_parent_dir(destination_path)
owned_lookup = set(owned_set_ids)
with source_path.open() as source_file, destination_path.open("w", newline="") as target_file:
reader = csv.DictReader(source_file)
fieldnames = reader.fieldnames + ["set_id", "rebrickable_url", "in_collection"]
writer = csv.DictWriter(target_file, fieldnames=fieldnames)
writer.writeheader()
for row in reader:
set_id = extract_set_id(row["set_num"])
writer.writerow(
{
**row,
"set_id": set_id,
"rebrickable_url": build_rebrickable_set_url(row["set_num"]),
"in_collection": str(set_id in owned_lookup).lower(),
}
)
def write_missing_sets_markdown(enriched_path: Path, destination_path: Path) -> None:
"""Génère un tableau Markdown listant les sets non possédés."""
with enriched_path.open() as source_file:
reader = csv.DictReader(source_file)
rows = [
row
for row in reader
if row["in_collection"] == "false"
]
ensure_parent_dir(destination_path)
with destination_path.open("w") as target_file:
target_file.write("| set_id | year | name |\n")
target_file.write("| --- | --- | --- |\n")
for row in rows:
link = f"[{row['set_id']}]({row['rebrickable_url']})"
target_file.write(f"| {link} | {row['year']} | {row['name']} |\n")

View File

@@ -0,0 +1,41 @@
"""Filtrage des sets LEGO par identifiants de thèmes Rebrickable."""
import csv
from pathlib import Path
from typing import Dict, Iterable, List
from lib.filesystem import ensure_parent_dir
def parse_theme_ids(raw_value: str) -> List[str]:
"""Extrait les identifiants de thèmes depuis une chaîne séparée par des virgules."""
values = [value.strip() for value in raw_value.split(",") if value.strip()]
if not values:
raise ValueError("Au moins un identifiant de thème est requis.")
return values
def filter_sets_by_theme(
source_path: Path,
destination_path: Path,
theme_ids: Iterable[str],
overrides_path: Path,
) -> None:
"""Filtre le catalogue des sets en conservant uniquement les thèmes ciblés avec pièces."""
ensure_parent_dir(destination_path)
allowed_ids = set(theme_ids)
overrides = load_num_parts_overrides(overrides_path)
with source_path.open() as source_file, destination_path.open("w", newline="") as target_file:
reader = csv.DictReader(source_file)
writer = csv.DictWriter(target_file, fieldnames=reader.fieldnames)
writer.writeheader()
for row in reader:
if row["theme_id"] in allowed_ids and int(row["num_parts"]) > 0:
override = overrides.get(row["set_num"])
writer.writerow({**row, "num_parts": override if override is not None else row["num_parts"]})
def load_num_parts_overrides(overrides_path: Path) -> Dict[str, str]:
"""Charge les corrections de nombre de pièces par set."""
with overrides_path.open() as overrides_file:
reader = csv.DictReader(overrides_file)
return {row["set_num"]: row["num_parts"] for row in reader}

View File

@@ -0,0 +1,107 @@
"""Rapport des écarts entre catalogue et inventaire agrégé."""
import csv
from pathlib import Path
from typing import Dict, Iterable, List
from lib.filesystem import ensure_parent_dir
def load_sets(sets_path: Path) -> List[dict]:
"""Charge les sets filtrés pour l'analyse."""
with sets_path.open() as sets_file:
reader = csv.DictReader(sets_file)
return list(reader)
def index_sets_by_num(sets: Iterable[dict]) -> Dict[str, dict]:
"""Crée un index des sets par numéro complet."""
return {row["set_num"]: row for row in sets}
def compute_inventory_totals(parts_path: Path, include_spares: bool) -> Dict[str, int]:
"""Calcule le total de pièces par set, avec ou sans rechanges."""
totals: Dict[str, int] = {}
with parts_path.open() as parts_file:
reader = csv.DictReader(parts_file)
for row in reader:
if not include_spares and row["is_spare"] == "true":
continue
set_num = row["set_num"]
totals[set_num] = totals.get(set_num, 0) + int(row["quantity_in_set"])
return totals
def compute_inventory_gaps(sets_path: Path, parts_path: Path) -> List[dict]:
"""Liste les sets dont le total de pièces diffère du catalogue."""
sets = load_sets(sets_path)
totals_with_spares = compute_inventory_totals(parts_path, include_spares=True)
totals_without_spares = compute_inventory_totals(parts_path, include_spares=False)
gaps: List[dict] = []
for set_row in sets:
expected_parts = int(set_row["num_parts"])
inventory_parts_with_spares = totals_with_spares[set_row["set_num"]]
inventory_parts_non_spare = totals_without_spares[set_row["set_num"]]
if expected_parts != inventory_parts_with_spares:
gaps.append(
{
"set_num": set_row["set_num"],
"set_id": set_row["set_id"],
"expected_parts": expected_parts,
"inventory_parts": inventory_parts_with_spares,
"inventory_parts_non_spare": inventory_parts_non_spare,
"delta": abs(expected_parts - inventory_parts_with_spares),
"delta_non_spare": abs(expected_parts - inventory_parts_non_spare),
"in_collection": set_row["in_collection"],
}
)
return gaps
def write_inventory_gaps_csv(destination_path: Path, gaps: Iterable[dict]) -> None:
"""Écrit un CSV listant les sets en écart d'inventaire."""
ensure_parent_dir(destination_path)
with destination_path.open("w", newline="") as csv_file:
fieldnames = [
"set_num",
"set_id",
"expected_parts",
"inventory_parts",
"inventory_parts_non_spare",
"delta",
"delta_non_spare",
"in_collection",
]
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in gaps:
writer.writerow(row)
def build_instructions_url(set_id: str) -> str:
"""Construit un lien direct vers la page d'instructions LEGO du set."""
return f"https://www.lego.com/service/buildinginstructions/{set_id}"
def write_inventory_gaps_markdown(
destination_path: Path,
gaps: Iterable[dict],
sets_by_num: Dict[str, dict],
) -> None:
"""Génère un tableau Markdown listant les sets en écart d'inventaire."""
ensure_parent_dir(destination_path)
with destination_path.open("w") as markdown_file:
markdown_file.write(
"| set_id | name | year | delta (spares inclus) | delta (spares exclus) | expected_parts | inventory_parts | inventory_parts_non_spare | in_collection | instructions |\n"
)
markdown_file.write("| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |\n")
for row in gaps:
if row["delta_non_spare"] == 0:
continue
set_row = sets_by_num[row["set_num"]]
set_link = f"[{row['set_id']}]({set_row['rebrickable_url']})"
instructions_link = f"[PDF]({build_instructions_url(row['set_id'])})"
markdown_file.write(
f"| {set_link} | {set_row['name']} | {set_row['year']} | {row['delta']} | {row['delta_non_spare']} | "
f"{row['expected_parts']} | {row['inventory_parts']} | {row['inventory_parts_non_spare']} | "
f"{row['in_collection']} | {instructions_link} |\n"
)

View File

@@ -0,0 +1,143 @@
"""Construction d'un inventaire détaillé des pièces par set."""
import csv
from pathlib import Path
from typing import Dict, List
from lib.filesystem import ensure_parent_dir
def normalize_boolean(raw_value: str) -> str:
"""Normalise une valeur booléenne en chaîne lowercase."""
return raw_value.lower()
def select_latest_inventories(inventories_path: Path) -> Dict[str, dict]:
"""Retient pour chaque set l'inventaire avec la version la plus élevée."""
latest_inventories: Dict[str, dict] = {}
with inventories_path.open() as inventories_file:
reader = csv.DictReader(inventories_file)
for row in reader:
current = latest_inventories.get(row["set_num"])
if current is None or int(row["version"]) > int(current["version"]):
latest_inventories[row["set_num"]] = {"id": row["id"], "version": row["version"]}
return latest_inventories
def build_color_lookup(colors_path: Path) -> Dict[str, dict]:
"""Construit un index des couleurs par identifiant."""
colors: Dict[str, dict] = {}
with colors_path.open() as colors_file:
reader = csv.DictReader(colors_file)
for row in reader:
colors[row["id"]] = {
"rgb": row["rgb"],
"is_translucent": normalize_boolean(row["is_trans"]),
}
return colors
def index_inventory_parts_by_inventory(inventory_parts_path: Path) -> Dict[str, List[dict]]:
"""Indexe les lignes d'inventaire par identifiant d'inventaire."""
parts_by_inventory: Dict[str, List[dict]] = {}
with inventory_parts_path.open() as parts_file:
reader = csv.DictReader(parts_file)
for row in reader:
inventory_id = row["inventory_id"]
if inventory_id not in parts_by_inventory:
parts_by_inventory[inventory_id] = []
parts_by_inventory[inventory_id].append(row)
return parts_by_inventory
def index_inventory_minifigs_by_inventory(inventory_minifigs_path: Path) -> Dict[str, List[dict]]:
"""Indexe les minifigs par inventaire."""
minifigs_by_inventory: Dict[str, List[dict]] = {}
with inventory_minifigs_path.open() as minifigs_file:
reader = csv.DictReader(minifigs_file)
for row in reader:
inventory_id = row["inventory_id"]
if inventory_id not in minifigs_by_inventory:
minifigs_by_inventory[inventory_id] = []
minifigs_by_inventory[inventory_id].append(row)
return minifigs_by_inventory
def build_minifig_lookup(minifigs_path: Path) -> Dict[str, dict]:
"""Construit un index des minifigs avec leur nombre de pièces."""
minifigs: Dict[str, dict] = {}
with minifigs_path.open() as minifigs_file:
reader = csv.DictReader(minifigs_file)
for row in reader:
minifigs[row["fig_num"]] = row
return minifigs
def write_parts_filtered(
sets_path: Path,
inventories_path: Path,
inventory_parts_path: Path,
colors_path: Path,
inventory_minifigs_path: Path,
minifigs_path: Path,
destination_path: Path,
) -> None:
"""Assemble un CSV agrégé listant les pièces par set et par couleur."""
latest_inventories = select_latest_inventories(inventories_path)
parts_by_inventory = index_inventory_parts_by_inventory(inventory_parts_path)
minifigs_by_inventory = index_inventory_minifigs_by_inventory(inventory_minifigs_path)
minifigs = build_minifig_lookup(minifigs_path)
colors = build_color_lookup(colors_path)
ensure_parent_dir(destination_path)
with sets_path.open() as sets_file, destination_path.open("w", newline="") as target_file:
sets_reader = csv.DictReader(sets_file)
fieldnames = [
"part_num",
"color_rgb",
"is_translucent",
"set_num",
"set_id",
"quantity_in_set",
"is_spare",
]
writer = csv.DictWriter(target_file, fieldnames=fieldnames)
writer.writeheader()
for set_row in sets_reader:
inventory = latest_inventories[set_row["set_num"]]
inventory_parts = parts_by_inventory[inventory["id"]]
inventory_total_non_spare = sum(
int(part_row["quantity"])
for part_row in inventory_parts
if normalize_boolean(part_row["is_spare"]) == "false"
)
expected_parts = int(set_row["num_parts"])
for part_row in inventory_parts:
color = colors[part_row["color_id"]]
writer.writerow(
{
"part_num": part_row["part_num"],
"color_rgb": color["rgb"],
"is_translucent": color["is_translucent"],
"set_num": set_row["set_num"],
"set_id": set_row["set_id"],
"quantity_in_set": part_row["quantity"],
"is_spare": normalize_boolean(part_row["is_spare"]),
}
)
if inventory_total_non_spare < expected_parts:
for minifig_row in minifigs_by_inventory.get(inventory["id"], []):
minifig_inventory = latest_inventories[minifig_row["fig_num"]]
minifig_parts = parts_by_inventory[minifig_inventory["id"]]
for part_row in minifig_parts:
color = colors[part_row["color_id"]]
writer.writerow(
{
"part_num": part_row["part_num"],
"color_rgb": color["rgb"],
"is_translucent": color["is_translucent"],
"set_num": set_row["set_num"],
"set_id": set_row["set_id"],
"quantity_in_set": str(int(part_row["quantity"]) * int(minifig_row["quantity"])),
"is_spare": normalize_boolean(part_row["is_spare"]),
}
)

View File

@@ -0,0 +1,101 @@
"""Calculs de statistiques simples sur les pièces filtrées."""
import csv
from collections import defaultdict
from pathlib import Path
from typing import Dict, Iterable, List, Sequence, Tuple
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.inventory_reconciliation import compute_inventory_gaps
from lib.rebrickable.stats import read_rows as read_stats_rows
def read_rows(path: Path) -> List[dict]:
"""Charge un fichier CSV en mémoire sous forme de dictionnaires."""
with path.open() as csv_file:
reader = csv.DictReader(csv_file)
return list(reader)
def select_non_spare_parts(rows: Iterable[dict]) -> List[dict]:
"""Filtre les pièces en excluant les rechanges."""
return [row for row in rows if row["is_spare"] == "false"]
def variation_key(row: dict) -> Tuple[str, str, str]:
"""Clé d'unicité pour une variation de pièce (référence + couleur)."""
return (row["part_num"], row["color_rgb"], row["is_translucent"])
def color_key(row: dict) -> Tuple[str, str]:
"""Clé d'unicité pour une couleur."""
return (row["color_rgb"], row["is_translucent"])
def aggregate_quantities_by_variation(rows: Iterable[dict]) -> Dict[Tuple[str, str, str], int]:
"""Calcule la quantité totale par variation de pièce (hors rechanges)."""
quantities: Dict[Tuple[str, str, str], int] = defaultdict(int)
for row in rows:
quantities[variation_key(row)] += int(row["quantity_in_set"])
return quantities
def read_total_filtered_parts(stats_path: Path) -> int:
"""Lit le total de pièces attendu pour les thèmes filtrés depuis stats.csv."""
rows = read_stats_rows(stats_path)
return int(
next(row["valeur"] for row in rows if row["libelle"] == "Total de pièces pour les thèmes filtrés")
)
def build_stats(
rows: Iterable[dict],
sets_path: Path,
parts_path: Path,
stats_path: Path,
) -> List[Tuple[str, str]]:
"""Construit les statistiques principales sur les pièces filtrées et les écarts d'inventaire."""
non_spares = select_non_spare_parts(rows)
quantities = aggregate_quantities_by_variation(non_spares)
total_variations = len(quantities)
color_set = {color_key(row) for row in non_spares}
least_used_key = min(quantities, key=quantities.get)
most_used_key = max(quantities, key=quantities.get)
least_used = quantities[least_used_key]
most_used = quantities[most_used_key]
total_non_spare = sum(quantities.values())
gaps = compute_inventory_gaps(sets_path, parts_path)
gap_count = len(gaps)
worst_gap = max(gaps, key=lambda gap: gap["delta"]) if gap_count > 0 else {"set_id": "none", "delta": 0}
catalog_total_parts = read_total_filtered_parts(stats_path)
catalog_inventory_delta = catalog_total_parts - total_non_spare
return [
("Total de variations de pièces (hors rechanges)", str(total_variations)),
(
"Pièce la moins utilisée (référence + couleur)",
f"{least_used_key[0]} / {least_used_key[1]} / {least_used_key[2]} ({least_used})",
),
(
"Pièce la plus commune (référence + couleur)",
f"{most_used_key[0]} / {most_used_key[1]} / {most_used_key[2]} ({most_used})",
),
("Total de couleurs utilisées (hors rechanges)", str(len(color_set))),
("Total de pièces hors rechanges", str(total_non_spare)),
(
"Ecart total catalogue (stats) - inventaire (hors rechanges)",
str(catalog_inventory_delta),
),
("Nombre de sets en écart inventaire/catalogue", str(gap_count)),
("Ecart maximal inventaire/catalogue", f"{worst_gap['set_id']} ({worst_gap['delta']})"),
]
def write_parts_stats(destination_path: Path, stats: Sequence[Tuple[str, str]]) -> None:
"""Écrit les statistiques dans un CSV à deux colonnes."""
ensure_parent_dir(destination_path)
with destination_path.open("w", newline="") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(["libelle", "valeur"])
for label, value in stats:
writer.writerow([label, value])

122
lib/rebrickable/stats.py Normal file
View File

@@ -0,0 +1,122 @@
"""Calcul des statistiques de base sur les sets LEGO filtrés."""
import csv
from pathlib import Path
from typing import Iterable, List, Sequence, Tuple
from lib.filesystem import ensure_parent_dir
def read_rows(path: Path) -> List[dict]:
"""Charge un fichier CSV en mémoire sous forme de dictionnaires."""
with path.open() as csv_file:
reader = csv.DictReader(csv_file)
return list(reader)
def write_stats_csv(destination_path: Path, stats: Sequence[Tuple[str, str]]) -> None:
"""Écrit les statistiques dans un CSV à deux colonnes."""
ensure_parent_dir(destination_path)
with destination_path.open("w", newline="") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(["libelle", "valeur"])
for label, value in stats:
writer.writerow([label, value])
def compute_median(values: List[int]) -> float:
"""Calcule la médiane d'une liste de valeurs entières."""
sorted_values = sorted(values)
middle = len(sorted_values) // 2
if len(sorted_values) % 2 == 1:
return float(sorted_values[middle])
return (sorted_values[middle - 1] + sorted_values[middle]) / 2
def compute_basic_stats(
themes: Iterable[dict],
all_sets: Iterable[dict],
filtered_sets: Iterable[dict],
enriched_sets: Iterable[dict],
) -> List[Tuple[str, str]]:
"""Calcule les statistiques principales à partir des sets chargés."""
themes_list = list(themes)
all_sets_list = list(all_sets)
filtered_sets_list = list(filtered_sets)
enriched_sets_list = list(enriched_sets)
theme_count_total = len(themes_list)
total_sets = len(all_sets_list)
filtered_sets_count = len(filtered_sets_list)
avg_sets_per_theme = total_sets / theme_count_total
percent_filtered = (filtered_sets_count / total_sets) * 100
owned_sets_count = sum(1 for row in enriched_sets_list if row["in_collection"] == "true")
missing_sets_count = sum(1 for row in enriched_sets_list if row["in_collection"] == "false")
percent_owned = (owned_sets_count / filtered_sets_count) * 100
parts_per_set = [int(row["num_parts"]) for row in filtered_sets_list]
avg_parts_per_set = sum(parts_per_set) / filtered_sets_count
median_parts_per_set = compute_median(parts_per_set)
years = [int(row["year"]) for row in filtered_sets_list]
avg_sets_per_year = filtered_sets_count / len(set(years))
total_parts = sum(parts_per_set)
theme_ids_filtered = {row["theme_id"] for row in filtered_sets_list}
min_year = str(min(years))
max_year = str(max(years))
year_counts = {}
for year in years:
year_counts[year] = year_counts.get(year, 0) + 1
prolific_year, prolific_count = max(year_counts.items(), key=lambda item: (item[1], -item[0]))
richest_set = max(filtered_sets_list, key=lambda row: int(row["num_parts"]))
lightest_set = min(filtered_sets_list, key=lambda row: int(row["num_parts"]))
oldest_set = min(filtered_sets_list, key=lambda row: (int(row["year"]), row["set_num"]))
latest_set = max(filtered_sets_list, key=lambda row: (int(row["year"]), row["set_num"]))
owned_parts = [int(row["num_parts"]) for row in enriched_sets_list if row["in_collection"] == "true"]
missing_parts = [int(row["num_parts"]) for row in enriched_sets_list if row["in_collection"] == "false"]
avg_parts_owned = sum(owned_parts) / len(owned_parts)
avg_parts_missing = sum(missing_parts) / len(missing_parts)
total_parts_owned = sum(owned_parts)
percent_parts_owned = (total_parts_owned / total_parts) * 100
return [
("Nombre total de sets (catalogue complet)", str(total_sets)),
("Nombre total de thèmes (catalogue complet)", str(theme_count_total)),
("Nombre de sets après filtrage (thèmes ciblés)", str(filtered_sets_count)),
("Nombre moyen de sets par thème (catalogue complet)", f"{avg_sets_per_theme:.2f}"),
("Pourcentage des sets filtrés vs total", f"{percent_filtered:.2f}%"),
("Taux de possession (thèmes filtrés)", f"{percent_owned:.2f}%"),
("Sets dans la collection", str(owned_sets_count)),
("Sets manquants pour la collection", str(missing_sets_count)),
("Nombre moyen de pièces par set (thèmes filtrés)", f"{avg_parts_per_set:.2f}"),
("Médiane de pièces par set (thèmes filtrés)", f"{median_parts_per_set:.2f}"),
("Nombre moyen de sets commercialisés par an (thèmes filtrés)", f"{avg_sets_per_year:.2f}"),
("Total de pièces pour les thèmes filtrés", str(total_parts)),
("Total de pièces des sets possédés", str(total_parts_owned)),
("Pourcentage de pièces possédées (thèmes filtrés)", f"{percent_parts_owned:.2f}%"),
("Nombre de thèmes filtrés", str(len(theme_ids_filtered))),
("Première année de sortie (thèmes filtrés)", min_year),
("Dernière année de sortie (thèmes filtrés)", max_year),
("Année la plus prolifique (thèmes filtrés)", f"{prolific_year} ({prolific_count} sets)"),
(
"Set avec le plus de pièces (thèmes filtrés)",
f"{richest_set['set_num']} - {richest_set['name']} ({richest_set['num_parts']} pièces)",
),
(
"Set avec le moins de pièces (thèmes filtrés)",
f"{lightest_set['set_num']} - {lightest_set['name']} ({lightest_set['num_parts']} pièces)",
),
(
"Set le plus ancien (thèmes filtrés)",
f"{oldest_set['set_num']} - {oldest_set['name']} ({oldest_set['year']})",
),
(
"Set le plus récent (thèmes filtrés)",
f"{latest_set['set_num']} - {latest_set['name']} ({latest_set['year']})",
),
(
"Nombre moyen de pièces des sets possédés",
f"{avg_parts_owned:.2f}",
),
(
"Nombre moyen de pièces des sets manquants",
f"{avg_parts_missing:.2f}",
),
]