1

Compare commits

...

6 Commits

54 changed files with 618 additions and 6 deletions

View File

@@ -383,3 +383,16 @@ Le calcul lit `data/intermediate/parts_filtered.csv`, `data/raw/parts.csv`, `dat
Le téléchargement sappuie sur `REBRICKABLE_TOKEN` et place les visuels des pièces dans `figures/rebrickable/{set_id}/rare_parts/{part_num}.jpg`, en journalisant les manques dans `data/intermediate/part_rarity_download_log.csv`. Le téléchargement sappuie sur `REBRICKABLE_TOKEN` et place les visuels des pièces dans `figures/rebrickable/{set_id}/rare_parts/{part_num}.jpg`, en journalisant les manques dans `data/intermediate/part_rarity_download_log.csv`.
Le tracé `figures/step34/part_rarity.png` juxtapose, pour chaque pièce de `part_rarity_exclusive.csv`, les occurrences dans les sets filtrés vs le reste du catalogue avec les images incrustées. Le tracé `figures/step34/part_rarity.png` juxtapose, pour chaque pièce de `part_rarity_exclusive.csv`, les occurrences dans les sets filtrés vs le reste du catalogue avec les images incrustées.
### Étape 35 : planches d'autocollants (collage)
1. `source .venv/bin/activate`
2. `python -m scripts.compute_sticker_parts`
3. `python -m scripts.download_sticker_resources`
4. `python -m scripts.plot_sticker_sheets`
Le calcul lit `data/intermediate/parts_filtered.csv`, `data/raw/parts.csv` et `data/intermediate/sets_enriched.csv`, conserve les pièces de catégorie 58 (stickers) hors rechanges et produit `data/intermediate/sticker_parts.csv` avec set, année, nom, référence et quantité.
Le téléchargement sappuie sur `REBRICKABLE_TOKEN` et enregistre les visuels dans `figures/rebrickable/{set_id}/stickers/{part_num}.jpg`, en journalisant les manques dans `data/intermediate/sticker_download_log.csv` (cache partagé `data/intermediate/part_img_cache.csv`).
Le collage `figures/step35/sticker_sheets.png` assemble toutes les planches trouvées (triées par année puis set) avec, sous chaque image, lannée, lidentifiant de set et la référence de la planche.

Binary file not shown.

After

Width:  |  Height:  |  Size: 263 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 132 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 129 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 120 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 123 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 221 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 200 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 117 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 126 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 189 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 164 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 197 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 185 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 243 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 191 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 248 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 109 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 172 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 248 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 226 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 190 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 406 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 194 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 177 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 112 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 193 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 136 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 131 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 152 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 268 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 128 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 126 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 251 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.0 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.3 MiB

View File

@@ -101,6 +101,41 @@ def plot_part_categories_heatmap(categories_by_year_path: Path, destination_path
plt.close(fig) plt.close(fig)
def plot_part_categories_heatmap_log(categories_by_year_path: Path, destination_path: Path) -> None:
"""Heatmap des quantités (log1p) par catégorie et par année, en excluant les catégories vides."""
rows = load_rows(categories_by_year_path)
years = extract_years(rows)
totals: Dict[str, int] = {}
quantity_lookup = {(row["year"], row["category_id"]): int(row["quantity_non_spare"]) for row in rows}
for row in rows:
totals[row["category_id"]] = totals.get(row["category_id"], 0) + int(row["quantity_non_spare"])
categories = sorted([cat_id for cat_id, total in totals.items() if total > 0], key=lambda cat_id: -totals[cat_id])
if not categories:
return
matrix = np.zeros((len(categories), len(years)))
for i, cat_id in enumerate(categories):
for j, year in enumerate(years):
matrix[i, j] = np.log1p(quantity_lookup.get((year, cat_id), 0))
fig, ax = plt.subplots(figsize=(12, 10))
cmap = plt.get_cmap("magma")
im = ax.imshow(matrix, aspect="auto", cmap=cmap, norm=Normalize(vmin=0, vmax=matrix.max() if matrix.max() > 0 else 1))
ax.set_xticks(np.arange(len(years)))
ax.set_xticklabels(years, rotation=45, ha="right")
labels = {row["category_id"]: row["category_name"] for row in rows}
ax.set_yticks(np.arange(len(categories)))
ax.set_yticklabels([labels[cat_id] for cat_id in categories])
ax.set_xlabel("Année")
ax.set_ylabel("Catégorie de pièce")
ax.set_title("Intensité des catégories de pièces par année (log des quantités)")
cbar = fig.colorbar(ScalarMappable(norm=im.norm, cmap=cmap), ax=ax, fraction=0.025, pad=0.015)
cbar.ax.set_ylabel("log1p(quantité)", rotation=90)
ensure_parent_dir(destination_path)
fig.tight_layout()
fig.savefig(destination_path, dpi=170)
plt.close(fig)
def plot_structural_share_timeline(categories_by_year_path: Path, destination_path: Path) -> None: def plot_structural_share_timeline(categories_by_year_path: Path, destination_path: Path) -> None:
"""Trace l'évolution de la part des catégories structurelles.""" """Trace l'évolution de la part des catégories structurelles."""
rows = load_rows(categories_by_year_path) rows = load_rows(categories_by_year_path)

View File

@@ -6,7 +6,7 @@ from typing import List
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from matplotlib.offsetbox import AnnotationBbox, OffsetImage from matplotlib.offsetbox import AnnotationBbox, OffsetImage
from PIL import Image from PIL import Image, ImageDraw, ImageFont
from lib.filesystem import ensure_parent_dir from lib.filesystem import ensure_parent_dir
@@ -21,6 +21,22 @@ def load_part_rarity(path: Path) -> List[dict]:
return rows return rows
def select_printed_exclusive(rows: List[dict], resources_dir: Path) -> List[dict]:
"""Filtre les pièces imprimées exclusives aux sets filtrés disposant d'une image locale."""
filtered: List[dict] = []
for row in rows:
if row.get("other_sets_quantity", "0") != "0":
continue
if "print" not in row["part_name"].lower():
continue
image_path = resources_dir / row.get("sample_set_id", "") / "rare_parts" / f"{row['part_num']}.jpg"
if not image_path.exists():
continue
filtered.append(row)
filtered.sort(key=lambda r: (r["part_name"], r["part_num"]))
return filtered
def format_label(row: dict) -> str: def format_label(row: dict) -> str:
"""Formate létiquette de laxe vertical.""" """Formate létiquette de laxe vertical."""
return f"{row['part_num']}{row['part_name']}" return f"{row['part_num']}{row['part_name']}"
@@ -84,3 +100,56 @@ def plot_part_rarity(
ensure_parent_dir(destination_path) ensure_parent_dir(destination_path)
fig.savefig(destination_path, dpi=150) fig.savefig(destination_path, dpi=150)
plt.close(fig) plt.close(fig)
def plot_printed_exclusive_parts(
path: Path,
destination_path: Path,
resources_dir: Path = Path("figures/rebrickable"),
columns: int = 5,
) -> None:
"""Assemble les images des pièces imprimées exclusives aux sets filtrés."""
rows = load_part_rarity(path)
selected = select_printed_exclusive(rows, resources_dir)
selected.sort(key=lambda r: (int(r.get("sample_set_year", "9999") or 9999), r["sample_set_num"], r["part_num"]))
if not selected:
return
images: List[Image.Image] = []
labels: List[str] = []
for row in selected:
image_path = resources_dir / row["sample_set_id"] / "rare_parts" / f"{row['part_num']}.jpg"
img = Image.open(image_path).convert("RGBA")
max_side = 180
ratio = min(max_side / img.width, max_side / img.height, 1.0)
if ratio < 1.0:
img = img.resize((int(img.width * ratio), int(img.height * ratio)))
images.append(img)
labels.append(f"{row.get('sample_set_year', '')}{row['sample_set_num']}")
columns = max(1, columns)
rows_count = (len(images) + columns - 1) // columns
cell_width = 220
font = ImageFont.load_default()
draw_temp = ImageDraw.Draw(Image.new("RGB", (10, 10)))
def measure(text: str) -> tuple[int, int]:
bbox = draw_temp.textbbox((0, 0), text, font=font)
return bbox[2] - bbox[0], bbox[3] - bbox[1]
text_height = max(measure(label)[1] for label in labels)
cell_height = 190 + text_height + 14
width = columns * cell_width
height = rows_count * cell_height
canvas = Image.new("RGBA", (width, height), (255, 255, 255, 255))
draw = ImageDraw.Draw(canvas)
for index, (img, label) in enumerate(zip(images, labels)):
col = index % columns
row_idx = index // columns
x = col * cell_width + (cell_width - img.width) // 2
y = row_idx * cell_height + 8
canvas.paste(img, (x, y), img)
text_width, _ = measure(label)
text_x = col * cell_width + (cell_width - text_width) // 2
text_y = y + img.height + 6
draw.text((text_x, text_y), label, fill="#111111", font=font)
ensure_parent_dir(destination_path)
canvas.convert("RGB").save(destination_path, "PNG")

View File

@@ -0,0 +1,72 @@
"""Assemblage visuel des planches d'autocollants des sets filtrés."""
from pathlib import Path
from typing import List
from PIL import Image, ImageDraw, ImageFont
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.stats import read_rows
def load_sticker_parts(path: Path) -> List[dict]:
"""Charge la liste des autocollants par set."""
return read_rows(path)
def plot_sticker_sheets(
stickers_path: Path,
destination_path: Path,
resources_dir: Path = Path("figures/rebrickable"),
columns: int = 6,
) -> None:
"""Assemble les images d'autocollants exclusifs en grille triée par année."""
rows = load_sticker_parts(stickers_path)
rows.sort(key=lambda r: (int(r["year"]), r["set_num"], r["part_num"]))
selected: List[dict] = []
images: List[Image.Image] = []
for row in rows:
image_path = resources_dir / row["set_id"] / "stickers" / f"{row['part_num']}.jpg"
if not image_path.exists():
continue
img = Image.open(image_path).convert("RGBA")
max_side = 260
ratio = min(max_side / img.width, max_side / img.height, 1.0)
if ratio < 1.0:
img = img.resize((int(img.width * ratio), int(img.height * ratio)))
images.append(img)
selected.append(row)
if not images:
return
font = ImageFont.load_default()
def measure(text: str) -> tuple[int, int]:
bbox = ImageDraw.Draw(Image.new("RGB", (10, 10))).textbbox((0, 0), text, font=font)
return bbox[2] - bbox[0], bbox[3] - bbox[1]
labels = [f"{row['year']}{row['set_id']}{row['part_num']}" for row in selected]
text_height = max(measure(label)[1] for label in labels)
max_width = max(img.width for img in images)
max_height = max(img.height for img in images)
columns = max(1, columns)
rows_count = (len(images) + columns - 1) // columns
cell_width = max(max_width + 40, 240)
cell_height = max_height + text_height + 20
width = columns * cell_width
height = rows_count * cell_height
canvas = Image.new("RGBA", (width, height), (255, 255, 255, 255))
draw = ImageDraw.Draw(canvas)
for index, (img, label) in enumerate(zip(images, labels)):
col = index % columns
row_idx = index // columns
x = col * cell_width + (cell_width - img.width) // 2
y = row_idx * cell_height + 6
canvas.paste(img, (x, y), img)
text_width, _ = measure(label)
text_x = col * cell_width + (cell_width - text_width) // 2
text_y = y + img.height + 6
draw.text((text_x, text_y), label, fill="#111111", font=font)
ensure_parent_dir(destination_path)
canvas.convert("RGB").save(destination_path, "PNG")

View File

@@ -46,6 +46,7 @@ def aggregate_filtered_parts(
parts_catalog: Dict[str, dict], parts_catalog: Dict[str, dict],
ignored_categories: Set[str] = IGNORED_PART_CATEGORY_IDS, ignored_categories: Set[str] = IGNORED_PART_CATEGORY_IDS,
ignored_minifig_categories: Set[str] = MINIFIG_PART_CATEGORY_IDS, ignored_minifig_categories: Set[str] = MINIFIG_PART_CATEGORY_IDS,
exclude_printed: bool = False,
) -> Dict[str, dict]: ) -> Dict[str, dict]:
"""Agrège les quantités par pièce pour les sets filtrés (rechanges incluses).""" """Agrège les quantités par pièce pour les sets filtrés (rechanges incluses)."""
aggregated: Dict[str, dict] = {} aggregated: Dict[str, dict] = {}
@@ -57,6 +58,8 @@ def aggregate_filtered_parts(
continue continue
if part["part_cat_id"] in ignored_minifig_categories: if part["part_cat_id"] in ignored_minifig_categories:
continue continue
if exclude_printed and "print" in part["name"].lower():
continue
entry = aggregated.get(row["part_num"]) entry = aggregated.get(row["part_num"])
if entry is None: if entry is None:
entry = {"quantity": 0, "set_numbers": set()} entry = {"quantity": 0, "set_numbers": set()}
@@ -73,6 +76,7 @@ def compute_other_set_usage(
filtered_set_numbers: Set[str], filtered_set_numbers: Set[str],
ignored_categories: Set[str] = IGNORED_PART_CATEGORY_IDS, ignored_categories: Set[str] = IGNORED_PART_CATEGORY_IDS,
ignored_minifig_categories: Set[str] = MINIFIG_PART_CATEGORY_IDS, ignored_minifig_categories: Set[str] = MINIFIG_PART_CATEGORY_IDS,
exclude_printed: bool = False,
) -> Dict[str, int]: ) -> Dict[str, int]:
"""Compte les occurrences des pièces dans le reste du catalogue (rechanges incluses).""" """Compte les occurrences des pièces dans le reste du catalogue (rechanges incluses)."""
inventories = select_latest_inventories(inventories_path) inventories = select_latest_inventories(inventories_path)
@@ -87,6 +91,8 @@ def compute_other_set_usage(
continue continue
if part["part_cat_id"] in ignored_minifig_categories: if part["part_cat_id"] in ignored_minifig_categories:
continue continue
if exclude_printed and "print" in part["name"].lower():
continue
totals[row["part_num"]] = totals.get(row["part_num"], 0) + int(row["quantity"]) totals[row["part_num"]] = totals.get(row["part_num"], 0) + int(row["quantity"])
return totals return totals
@@ -98,6 +104,7 @@ def build_part_rarity(
parts_catalog_path: Path, parts_catalog_path: Path,
part_categories_path: Path, part_categories_path: Path,
filtered_sets_path: Path, filtered_sets_path: Path,
exclude_printed: bool = False,
) -> List[dict]: ) -> List[dict]:
"""Construit le classement de rareté des pièces filtrées.""" """Construit le classement de rareté des pièces filtrées."""
parts_catalog = load_parts_catalog(parts_catalog_path) parts_catalog = load_parts_catalog(parts_catalog_path)
@@ -105,12 +112,13 @@ def build_part_rarity(
filtered_sets = load_filtered_sets(filtered_sets_path) filtered_sets = load_filtered_sets(filtered_sets_path)
filtered_set_numbers = set(filtered_sets.keys()) filtered_set_numbers = set(filtered_sets.keys())
filtered_rows = read_rows(parts_filtered_path) filtered_rows = read_rows(parts_filtered_path)
filtered_usage = aggregate_filtered_parts(filtered_rows, parts_catalog) filtered_usage = aggregate_filtered_parts(filtered_rows, parts_catalog, exclude_printed=exclude_printed)
other_usage = compute_other_set_usage( other_usage = compute_other_set_usage(
inventories_path, inventories_path,
inventory_parts_path, inventory_parts_path,
parts_catalog, parts_catalog,
filtered_set_numbers, filtered_set_numbers,
exclude_printed=exclude_printed,
) )
rows: List[dict] = [] rows: List[dict] = []
for part_num, entry in filtered_usage.items(): for part_num, entry in filtered_usage.items():
@@ -118,7 +126,8 @@ def build_part_rarity(
other_quantity = other_usage.get(part_num, 0) other_quantity = other_usage.get(part_num, 0)
total_quantity = entry["quantity"] + other_quantity total_quantity = entry["quantity"] + other_quantity
sample_set_num = sorted(entry["set_numbers"])[0] sample_set_num = sorted(entry["set_numbers"])[0]
sample_set_id = filtered_sets[sample_set_num]["set_id"] sample_set_row = filtered_sets[sample_set_num]
sample_set_id = sample_set_row["set_id"]
rows.append( rows.append(
{ {
"part_num": part_num, "part_num": part_num,
@@ -127,6 +136,7 @@ def build_part_rarity(
"part_category": categories[part["part_cat_id"]], "part_category": categories[part["part_cat_id"]],
"sample_set_num": sample_set_num, "sample_set_num": sample_set_num,
"sample_set_id": sample_set_id, "sample_set_id": sample_set_id,
"sample_set_year": sample_set_row["year"],
"filtered_quantity": str(entry["quantity"]), "filtered_quantity": str(entry["quantity"]),
"filtered_set_count": str(len(entry["set_numbers"])), "filtered_set_count": str(len(entry["set_numbers"])),
"other_sets_quantity": str(other_quantity), "other_sets_quantity": str(other_quantity),
@@ -148,6 +158,7 @@ def write_part_rarity(destination_path: Path, rows: Sequence[dict]) -> None:
"part_category", "part_category",
"sample_set_num", "sample_set_num",
"sample_set_id", "sample_set_id",
"sample_set_year",
"filtered_quantity", "filtered_quantity",
"filtered_set_count", "filtered_set_count",
"other_sets_quantity", "other_sets_quantity",

View File

@@ -0,0 +1,86 @@
"""Sélection des planches d'autocollants pour les sets filtrés."""
import csv
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.stats import read_rows
STICKER_CATEGORY_ID = "58"
def load_parts_catalog(path: Path) -> Dict[str, dict]:
"""Indexe les pièces par référence."""
catalog: Dict[str, dict] = {}
with path.open() as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
catalog[row["part_num"]] = row
return catalog
def load_sets(path: Path) -> Dict[str, dict]:
"""Indexe les sets enrichis par set_num."""
lookup: Dict[str, dict] = {}
for row in read_rows(path):
lookup[row["set_num"]] = row
return lookup
def aggregate_stickers(
rows: Iterable[dict],
parts_catalog: Dict[str, dict],
) -> Dict[Tuple[str, str], int]:
"""Cumule les quantités d'autocollants par set et référence."""
aggregated: Dict[Tuple[str, str], int] = {}
for row in rows:
if row["is_spare"] == "true":
continue
part = parts_catalog[row["part_num"]]
if part["part_cat_id"] != STICKER_CATEGORY_ID:
continue
key = (row["set_num"], row["part_num"])
aggregated[key] = aggregated.get(key, 0) + int(row["quantity_in_set"])
return aggregated
def build_sticker_parts(
parts_filtered_path: Path,
parts_catalog_path: Path,
sets_path: Path,
) -> List[dict]:
"""Construit la liste des planches d'autocollants par set."""
rows = read_rows(parts_filtered_path)
parts_catalog = load_parts_catalog(parts_catalog_path)
sets_lookup = load_sets(sets_path)
aggregated = aggregate_stickers(rows, parts_catalog)
stickers: List[dict] = []
for (set_num, part_num), quantity in aggregated.items():
set_row = sets_lookup[set_num]
part = parts_catalog[part_num]
stickers.append(
{
"set_num": set_num,
"set_id": set_row["set_id"],
"year": set_row["year"],
"name": set_row["name"],
"part_num": part_num,
"part_name": part["name"],
"quantity": str(quantity),
}
)
stickers.sort(key=lambda r: (int(r["year"]), r["set_num"], r["part_num"]))
return stickers
def write_sticker_parts(destination_path: Path, rows: Iterable[dict]) -> None:
"""Écrit le CSV des autocollants par set."""
ensure_parent_dir(destination_path)
fieldnames = ["set_num", "set_id", "year", "name", "part_num", "part_name", "quantity"]
with destination_path.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)

View File

@@ -12,7 +12,9 @@ PARTS_CATALOG_PATH = Path("data/raw/parts.csv")
PART_CATEGORIES_PATH = Path("data/raw/part_categories.csv") PART_CATEGORIES_PATH = Path("data/raw/part_categories.csv")
FILTERED_SETS_PATH = Path("data/intermediate/sets_enriched.csv") FILTERED_SETS_PATH = Path("data/intermediate/sets_enriched.csv")
DESTINATION_PATH = Path("data/intermediate/part_rarity.csv") DESTINATION_PATH = Path("data/intermediate/part_rarity.csv")
DESTINATION_PRINTED_EXCLUDED_PATH = Path("data/intermediate/part_rarity_no_print.csv")
TOP_DESTINATION_PATH = Path("data/intermediate/part_rarity_exclusive.csv") TOP_DESTINATION_PATH = Path("data/intermediate/part_rarity_exclusive.csv")
TOP_PRINTED_EXCLUDED_PATH = Path("data/intermediate/part_rarity_exclusive_no_print.csv")
def main() -> None: def main() -> None:
@@ -29,6 +31,19 @@ def main() -> None:
top_rows = select_until_reused(rows) top_rows = select_until_reused(rows)
write_part_rarity(TOP_DESTINATION_PATH, top_rows) write_part_rarity(TOP_DESTINATION_PATH, top_rows)
rows_no_print = build_part_rarity(
PARTS_FILTERED_PATH,
INVENTORIES_PATH,
INVENTORY_PARTS_PATH,
PARTS_CATALOG_PATH,
PART_CATEGORIES_PATH,
FILTERED_SETS_PATH,
exclude_printed=True,
)
write_part_rarity(DESTINATION_PRINTED_EXCLUDED_PATH, rows_no_print)
top_rows_no_print = select_until_reused(rows_no_print)
write_part_rarity(TOP_PRINTED_EXCLUDED_PATH, top_rows_no_print)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -0,0 +1,21 @@
"""Extrait les planches d'autocollants des sets filtrés."""
from pathlib import Path
from lib.rebrickable.sticker_parts import build_sticker_parts, write_sticker_parts
PARTS_FILTERED_PATH = Path("data/intermediate/parts_filtered.csv")
PARTS_CATALOG_PATH = Path("data/raw/parts.csv")
SETS_PATH = Path("data/intermediate/sets_enriched.csv")
DESTINATION_PATH = Path("data/intermediate/sticker_parts.csv")
def main() -> None:
"""Construit le CSV des autocollants présents dans les sets filtrés."""
stickers = build_sticker_parts(PARTS_FILTERED_PATH, PARTS_CATALOG_PATH, SETS_PATH)
write_sticker_parts(DESTINATION_PATH, stickers)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,76 @@
"""Télécharge les images des planches d'autocollants des sets filtrés."""
import csv
import os
from pathlib import Path
import requests
from dotenv import load_dotenv
from lib.filesystem import ensure_parent_dir
from lib.rebrickable.resources import (
build_part_img_lookup,
download_binary,
download_resources,
fetch_part_img_url,
load_part_img_cache,
persist_part_img_cache,
)
from lib.rebrickable.stats import read_rows
STICKER_PARTS_PATH = Path("data/intermediate/sticker_parts.csv")
RESOURCES_DIR = Path("figures/rebrickable")
PART_IMG_CACHE_PATH = Path("data/intermediate/part_img_cache.csv")
DOWNLOAD_LOG_PATH = Path("data/intermediate/sticker_download_log.csv")
REQUEST_DELAY_SECONDS_IMAGES = 0.35
REQUEST_DELAY_SECONDS_LOOKUP = 0.6
def main() -> None:
"""Construit les URLs manquantes et télécharge les planches d'autocollants."""
load_dotenv()
token = os.environ["REBRICKABLE_TOKEN"]
session = requests.Session()
stickers = read_rows(STICKER_PARTS_PATH)
cache = load_part_img_cache(PART_IMG_CACHE_PATH)
part_img_lookup = build_part_img_lookup(
{row["part_num"] for row in stickers},
fetcher=lambda part_num: fetch_part_img_url(part_num, token, session),
cache_path=PART_IMG_CACHE_PATH,
existing_cache=cache,
delay_seconds=REQUEST_DELAY_SECONDS_LOOKUP,
)
if cache:
part_img_lookup.update(cache)
persist_part_img_cache(PART_IMG_CACHE_PATH, part_img_lookup)
plan = []
missing_log = []
for row in stickers:
url = part_img_lookup.get(row["part_num"])
path = RESOURCES_DIR / row["set_id"] / "stickers" / f"{row['part_num']}.jpg"
if not url or not str(url).startswith("http"):
missing_log.append({"url": url or "", "path": str(path), "status": "missing_url"})
continue
plan.append({"url": url, "path": path})
download_resources(
plan,
downloader=lambda url, path: download_binary(url, path, session),
delay_seconds=REQUEST_DELAY_SECONDS_IMAGES,
log_path=DOWNLOAD_LOG_PATH if not missing_log else None,
)
if missing_log:
ensure_parent_dir(DOWNLOAD_LOG_PATH)
with DOWNLOAD_LOG_PATH.open("w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=["url", "path", "status"])
writer.writeheader()
for row in missing_log:
writer.writerow(row)
if __name__ == "__main__":
main()

View File

@@ -4,6 +4,7 @@ from pathlib import Path
from lib.plots.part_categories import ( from lib.plots.part_categories import (
plot_part_categories_heatmap, plot_part_categories_heatmap,
plot_part_categories_heatmap_log,
plot_structural_share_timeline, plot_structural_share_timeline,
plot_top_part_categories_area, plot_top_part_categories_area,
) )
@@ -13,6 +14,7 @@ CATEGORIES_BY_YEAR_PATH = Path("data/intermediate/part_categories_by_year.csv")
CATEGORIES_GLOBAL_PATH = Path("data/intermediate/part_categories_global.csv") CATEGORIES_GLOBAL_PATH = Path("data/intermediate/part_categories_global.csv")
AREA_DESTINATION = Path("figures/step29/top_part_categories_area.png") AREA_DESTINATION = Path("figures/step29/top_part_categories_area.png")
HEATMAP_DESTINATION = Path("figures/step29/part_categories_heatmap.png") HEATMAP_DESTINATION = Path("figures/step29/part_categories_heatmap.png")
HEATMAP_LOG_DESTINATION = Path("figures/step29/part_categories_heatmap_log.png")
STRUCTURAL_DESTINATION = Path("figures/step29/structural_share_timeline.png") STRUCTURAL_DESTINATION = Path("figures/step29/structural_share_timeline.png")
@@ -20,6 +22,7 @@ def main() -> None:
"""Génère les visuels de répartition par catégorie.""" """Génère les visuels de répartition par catégorie."""
plot_top_part_categories_area(CATEGORIES_BY_YEAR_PATH, CATEGORIES_GLOBAL_PATH, AREA_DESTINATION) plot_top_part_categories_area(CATEGORIES_BY_YEAR_PATH, CATEGORIES_GLOBAL_PATH, AREA_DESTINATION)
plot_part_categories_heatmap(CATEGORIES_BY_YEAR_PATH, HEATMAP_DESTINATION) plot_part_categories_heatmap(CATEGORIES_BY_YEAR_PATH, HEATMAP_DESTINATION)
plot_part_categories_heatmap_log(CATEGORIES_BY_YEAR_PATH, HEATMAP_LOG_DESTINATION)
plot_structural_share_timeline(CATEGORIES_BY_YEAR_PATH, STRUCTURAL_DESTINATION) plot_structural_share_timeline(CATEGORIES_BY_YEAR_PATH, STRUCTURAL_DESTINATION)

View File

@@ -2,17 +2,23 @@
from pathlib import Path from pathlib import Path
from lib.plots.part_rarity import plot_part_rarity from lib.plots.part_rarity import plot_part_rarity, plot_printed_exclusive_parts
PART_RARITY_TOP_PATH = Path("data/intermediate/part_rarity_exclusive.csv") PART_RARITY_TOP_PATH = Path("data/intermediate/part_rarity_exclusive.csv")
DESTINATION_PATH = Path("figures/step34/part_rarity.png") DESTINATION_PATH = Path("figures/step34/part_rarity.png")
RESOURCES_DIR = Path("figures/rebrickable") RESOURCES_DIR = Path("figures/rebrickable")
PART_RARITY_NO_PRINT_PATH = Path("data/intermediate/part_rarity_exclusive_no_print.csv")
DESTINATION_NO_PRINT = Path("figures/step34/part_rarity_no_print.png")
PART_RARITY_FULL_PATH = Path("data/intermediate/part_rarity.csv")
DESTINATION_PRINTED_COLLAGE = Path("figures/step34/printed_exclusive_parts.png")
def main() -> None: def main() -> None:
"""Charge le top des pièces rares et produit le graphique illustré.""" """Charge le top des pièces rares et produit le graphique illustré."""
plot_part_rarity(PART_RARITY_TOP_PATH, DESTINATION_PATH, resources_dir=RESOURCES_DIR) plot_part_rarity(PART_RARITY_TOP_PATH, DESTINATION_PATH, resources_dir=RESOURCES_DIR)
plot_part_rarity(PART_RARITY_NO_PRINT_PATH, DESTINATION_NO_PRINT, resources_dir=RESOURCES_DIR)
plot_printed_exclusive_parts(PART_RARITY_FULL_PATH, DESTINATION_PRINTED_COLLAGE, resources_dir=RESOURCES_DIR)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1,19 @@
"""Assemble les visuels des planches d'autocollants des sets filtrés."""
from pathlib import Path
from lib.plots.sticker_sheets import plot_sticker_sheets
STICKER_PARTS_PATH = Path("data/intermediate/sticker_parts.csv")
DESTINATION_PATH = Path("figures/step35/sticker_sheets.png")
RESOURCES_DIR = Path("figures/rebrickable")
def main() -> None:
"""Construit le collage des planches d'autocollants."""
plot_sticker_sheets(STICKER_PARTS_PATH, DESTINATION_PATH, resources_dir=RESOURCES_DIR)
if __name__ == "__main__":
main()

View File

@@ -5,6 +5,7 @@ from pathlib import Path
from lib.plots.part_categories import ( from lib.plots.part_categories import (
plot_part_categories_heatmap, plot_part_categories_heatmap,
plot_part_categories_heatmap_log,
plot_structural_share_timeline, plot_structural_share_timeline,
plot_top_part_categories_area, plot_top_part_categories_area,
) )
@@ -31,15 +32,19 @@ def test_plot_part_categories_outputs_images(tmp_path: Path) -> None:
) )
area_dest = tmp_path / "figures" / "step29" / "top_part_categories_area.png" area_dest = tmp_path / "figures" / "step29" / "top_part_categories_area.png"
heatmap_dest = tmp_path / "figures" / "step29" / "part_categories_heatmap.png" heatmap_dest = tmp_path / "figures" / "step29" / "part_categories_heatmap.png"
heatmap_log_dest = tmp_path / "figures" / "step29" / "part_categories_heatmap_log.png"
structural_dest = tmp_path / "figures" / "step29" / "structural_share_timeline.png" structural_dest = tmp_path / "figures" / "step29" / "structural_share_timeline.png"
plot_top_part_categories_area(by_year, by_global, area_dest, top_n=2) plot_top_part_categories_area(by_year, by_global, area_dest, top_n=2)
plot_part_categories_heatmap(by_year, heatmap_dest) plot_part_categories_heatmap(by_year, heatmap_dest)
plot_part_categories_heatmap_log(by_year, heatmap_log_dest)
plot_structural_share_timeline(by_year, structural_dest) plot_structural_share_timeline(by_year, structural_dest)
assert area_dest.exists() assert area_dest.exists()
assert heatmap_dest.exists() assert heatmap_dest.exists()
assert heatmap_log_dest.exists()
assert structural_dest.exists() assert structural_dest.exists()
assert area_dest.stat().st_size > 0 assert area_dest.stat().st_size > 0
assert heatmap_dest.stat().st_size > 0 assert heatmap_dest.stat().st_size > 0
assert heatmap_log_dest.stat().st_size > 0
assert structural_dest.stat().st_size > 0 assert structural_dest.stat().st_size > 0

View File

@@ -58,6 +58,7 @@ def test_build_part_rarity_counts_spares_and_ignores_categories(tmp_path: Path)
["p4", "Figure Limb", "41", "Plastic"], ["p4", "Figure Limb", "41", "Plastic"],
["p5", "Sticker Sheet", "58", "Plastic"], ["p5", "Sticker Sheet", "58", "Plastic"],
["p6", "Exclusive Tile", "1", "Plastic"], ["p6", "Exclusive Tile", "1", "Plastic"],
["p7", "Slope 45 print", "1", "Plastic"],
], ],
) )
part_categories = tmp_path / "part_categories.csv" part_categories = tmp_path / "part_categories.csv"
@@ -95,6 +96,7 @@ def test_build_part_rarity_counts_spares_and_ignores_categories(tmp_path: Path)
["3", "p4", "1", "4", "True", ""], ["3", "p4", "1", "4", "True", ""],
["4", "p1", "1", "8", "False", ""], ["4", "p1", "1", "8", "False", ""],
["5", "p5", "1", "9", "False", ""], ["5", "p5", "1", "9", "False", ""],
["5", "p7", "1", "5", "False", ""],
], ],
) )
@@ -115,6 +117,7 @@ def test_build_part_rarity_counts_spares_and_ignores_categories(tmp_path: Path)
"part_category": "Bricks", "part_category": "Bricks",
"sample_set_num": "2000-1", "sample_set_num": "2000-1",
"sample_set_id": "2000", "sample_set_id": "2000",
"sample_set_year": "2021",
"filtered_quantity": "1", "filtered_quantity": "1",
"filtered_set_count": "1", "filtered_set_count": "1",
"other_sets_quantity": "0", "other_sets_quantity": "0",
@@ -128,6 +131,7 @@ def test_build_part_rarity_counts_spares_and_ignores_categories(tmp_path: Path)
"part_category": "Bricks", "part_category": "Bricks",
"sample_set_num": "1000-1", "sample_set_num": "1000-1",
"sample_set_id": "1000", "sample_set_id": "1000",
"sample_set_year": "2020",
"filtered_quantity": "3", "filtered_quantity": "3",
"filtered_set_count": "2", "filtered_set_count": "2",
"other_sets_quantity": "3", "other_sets_quantity": "3",
@@ -141,6 +145,7 @@ def test_build_part_rarity_counts_spares_and_ignores_categories(tmp_path: Path)
"part_category": "Large Buildable Figures", "part_category": "Large Buildable Figures",
"sample_set_num": "2000-1", "sample_set_num": "2000-1",
"sample_set_id": "2000", "sample_set_id": "2000",
"sample_set_year": "2021",
"filtered_quantity": "2", "filtered_quantity": "2",
"filtered_set_count": "1", "filtered_set_count": "1",
"other_sets_quantity": "4", "other_sets_quantity": "4",
@@ -150,6 +155,17 @@ def test_build_part_rarity_counts_spares_and_ignores_categories(tmp_path: Path)
] ]
assert select_until_reused(rows) == [rows[0], rows[1]] assert select_until_reused(rows) == [rows[0], rows[1]]
rows_no_print = build_part_rarity(
parts_filtered,
inventories,
inventory_parts,
parts_catalog,
part_categories,
sets_enriched,
exclude_printed=True,
)
assert all(r["part_num"] != "p7" for r in rows_no_print)
def test_write_part_rarity_outputs_csv(tmp_path: Path) -> None: def test_write_part_rarity_outputs_csv(tmp_path: Path) -> None:
"""Sérialise le classement de rareté.""" """Sérialise le classement de rareté."""
@@ -162,6 +178,7 @@ def test_write_part_rarity_outputs_csv(tmp_path: Path) -> None:
"part_category": "Bricks", "part_category": "Bricks",
"sample_set_num": "123-1", "sample_set_num": "123-1",
"sample_set_id": "123", "sample_set_id": "123",
"sample_set_year": "2020",
"filtered_quantity": "3", "filtered_quantity": "3",
"filtered_set_count": "2", "filtered_set_count": "2",
"other_sets_quantity": "3", "other_sets_quantity": "3",
@@ -175,7 +192,7 @@ def test_write_part_rarity_outputs_csv(tmp_path: Path) -> None:
assert destination.exists() assert destination.exists()
content = destination.read_text().strip().splitlines() content = destination.read_text().strip().splitlines()
assert content[0] == ( assert content[0] == (
"part_num,part_name,part_cat_id,part_category,sample_set_num,sample_set_id,filtered_quantity,filtered_set_count," "part_num,part_name,part_cat_id,part_category,sample_set_num,sample_set_id,sample_set_year,filtered_quantity,filtered_set_count,"
"other_sets_quantity,catalog_total_quantity,filtered_share" "other_sets_quantity,catalog_total_quantity,filtered_share"
) )
assert content[1] == "p1,Brick 1x1,1,Bricks,123-1,123,3,2,3,6,0.5000" assert content[1] == "p1,Brick 1x1,1,Bricks,123-1,123,2020,3,2,3,6,0.5000"

View File

@@ -0,0 +1,33 @@
"""Tests du collage des pièces imprimées exclusives."""
import matplotlib
from pathlib import Path
from PIL import Image
from lib.plots.part_rarity import plot_printed_exclusive_parts
matplotlib.use("Agg")
def test_plot_printed_exclusive_parts(tmp_path: Path) -> None:
"""Génère un collage des pièces imprimées exclusives avec images locales."""
data_path = tmp_path / "part_rarity.csv"
resources_dir = tmp_path / "figures" / "rebrickable"
resources_dir.mkdir(parents=True)
(resources_dir / "1000" / "rare_parts").mkdir(parents=True)
(resources_dir / "2000" / "rare_parts").mkdir(parents=True)
Image.new("RGB", (60, 40), color=(255, 0, 0)).save(resources_dir / "1000" / "rare_parts" / "p1.jpg")
Image.new("RGB", (60, 40), color=(0, 255, 0)).save(resources_dir / "2000" / "rare_parts" / "p2.jpg")
data_path.write_text(
"part_num,part_name,part_cat_id,part_category,sample_set_num,sample_set_id,sample_set_year,filtered_quantity,filtered_set_count,other_sets_quantity,catalog_total_quantity,filtered_share\n"
"p1,Slope print,1,Bricks,1000-1,1000,2020,3,2,0,3,1.0000\n"
"p2,Tile print,1,Bricks,2000-1,2000,2021,2,1,0,2,1.0000\n"
"p3,Tile plain,1,Bricks,2000-1,2000,2021,2,1,0,2,1.0000\n"
)
destination = tmp_path / "figures" / "step34" / "printed_exclusive_parts.png"
plot_printed_exclusive_parts(data_path, destination, resources_dir=resources_dir, columns=2)
assert destination.exists()
assert destination.stat().st_size > 0

104
tests/test_sticker_parts.py Normal file
View File

@@ -0,0 +1,104 @@
"""Tests de l'extraction des planches d'autocollants."""
import csv
from pathlib import Path
from lib.rebrickable.sticker_parts import build_sticker_parts, write_sticker_parts
def write_csv(path: Path, headers: list[str], rows: list[list[str]]) -> None:
"""Écrit un CSV simple."""
with path.open("w", newline="") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(headers)
writer.writerows(rows)
def test_build_sticker_parts_filters_category_and_spares(tmp_path: Path) -> None:
"""Ne conserve que les autocollants (catégorie 58) hors rechanges."""
parts_filtered = tmp_path / "parts_filtered.csv"
write_csv(
parts_filtered,
[
"part_num",
"color_rgb",
"is_translucent",
"set_num",
"set_id",
"year",
"quantity_in_set",
"is_spare",
"is_minifig_part",
],
[
["st1", "AAAAAA", "false", "1000-1", "1000", "2020", "1", "false", "false"],
["st1", "AAAAAA", "false", "1000-1", "1000", "2020", "2", "true", "false"],
["br1", "BBBBBB", "false", "1000-1", "1000", "2020", "5", "false", "false"],
["st2", "CCCCCC", "false", "2000-1", "2000", "2021", "3", "false", "false"],
],
)
parts_catalog = tmp_path / "parts.csv"
write_csv(
parts_catalog,
["part_num", "name", "part_cat_id", "part_material"],
[
["st1", "Sticker Sheet 1", "58", "Plastic"],
["st2", "Sticker Sheet 2", "58", "Plastic"],
["br1", "Brick", "1", "Plastic"],
],
)
sets_enriched = tmp_path / "sets_enriched.csv"
write_csv(
sets_enriched,
["set_num", "set_id", "name", "year", "in_collection"],
[
["1000-1", "1000", "Set A", "2020", "true"],
["2000-1", "2000", "Set B", "2021", "false"],
],
)
stickers = build_sticker_parts(parts_filtered, parts_catalog, sets_enriched)
assert stickers == [
{
"set_num": "1000-1",
"set_id": "1000",
"year": "2020",
"name": "Set A",
"part_num": "st1",
"part_name": "Sticker Sheet 1",
"quantity": "1",
},
{
"set_num": "2000-1",
"set_id": "2000",
"year": "2021",
"name": "Set B",
"part_num": "st2",
"part_name": "Sticker Sheet 2",
"quantity": "3",
},
]
def test_write_sticker_parts_outputs_csv(tmp_path: Path) -> None:
"""Sérialise la liste des autocollants par set."""
destination = tmp_path / "sticker_parts.csv"
rows = [
{
"set_num": "123-1",
"set_id": "123",
"year": "2020",
"name": "Set",
"part_num": "st1",
"part_name": "Sticker",
"quantity": "1",
}
]
write_sticker_parts(destination, rows)
assert destination.exists()
content = destination.read_text().strip().splitlines()
assert content[0] == "set_num,set_id,year,name,part_num,part_name,quantity"
assert content[1] == "123-1,123,2020,Set,st1,Sticker,1"

View File

@@ -0,0 +1,27 @@
"""Tests du collage des planches d'autocollants."""
from pathlib import Path
from PIL import Image
from lib.plots.sticker_sheets import plot_sticker_sheets
def test_plot_sticker_sheets(tmp_path: Path) -> None:
"""Génère une grille de planches d'autocollants avec labels."""
stickers_path = tmp_path / "sticker_parts.csv"
resources_dir = tmp_path / "figures" / "rebrickable"
(resources_dir / "1000" / "stickers").mkdir(parents=True)
(resources_dir / "2000" / "stickers").mkdir(parents=True)
Image.new("RGB", (120, 80), color=(255, 0, 0)).save(resources_dir / "1000" / "stickers" / "st1.jpg")
Image.new("RGB", (100, 60), color=(0, 255, 0)).save(resources_dir / "2000" / "stickers" / "st2.jpg")
stickers_path.write_text(
"set_num,set_id,year,name,part_num,part_name,quantity\n"
"1000-1,1000,2020,Set A,st1,Sticker 1,1\n"
"2000-1,2000,2021,Set B,st2,Sticker 2,1\n"
)
destination = tmp_path / "figures" / "step35" / "sticker_sheets.png"
plot_sticker_sheets(stickers_path, destination, resources_dir=resources_dir, columns=2)
assert destination.exists()
assert destination.stat().st_size > 0