You've already forked etude_lego_jurassic_world
Ajouter l’étape 35 : extraction et collage des autocollants
This commit is contained in:
86
lib/rebrickable/sticker_parts.py
Normal file
86
lib/rebrickable/sticker_parts.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Sélection des planches d'autocollants pour les sets filtrés."""
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Tuple
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
from lib.rebrickable.stats import read_rows
|
||||
|
||||
|
||||
STICKER_CATEGORY_ID = "58"
|
||||
|
||||
|
||||
def load_parts_catalog(path: Path) -> Dict[str, dict]:
|
||||
"""Indexe les pièces par référence."""
|
||||
catalog: Dict[str, dict] = {}
|
||||
with path.open() as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
for row in reader:
|
||||
catalog[row["part_num"]] = row
|
||||
return catalog
|
||||
|
||||
|
||||
def load_sets(path: Path) -> Dict[str, dict]:
|
||||
"""Indexe les sets enrichis par set_num."""
|
||||
lookup: Dict[str, dict] = {}
|
||||
for row in read_rows(path):
|
||||
lookup[row["set_num"]] = row
|
||||
return lookup
|
||||
|
||||
|
||||
def aggregate_stickers(
|
||||
rows: Iterable[dict],
|
||||
parts_catalog: Dict[str, dict],
|
||||
) -> Dict[Tuple[str, str], int]:
|
||||
"""Cumule les quantités d'autocollants par set et référence."""
|
||||
aggregated: Dict[Tuple[str, str], int] = {}
|
||||
for row in rows:
|
||||
if row["is_spare"] == "true":
|
||||
continue
|
||||
part = parts_catalog[row["part_num"]]
|
||||
if part["part_cat_id"] != STICKER_CATEGORY_ID:
|
||||
continue
|
||||
key = (row["set_num"], row["part_num"])
|
||||
aggregated[key] = aggregated.get(key, 0) + int(row["quantity_in_set"])
|
||||
return aggregated
|
||||
|
||||
|
||||
def build_sticker_parts(
|
||||
parts_filtered_path: Path,
|
||||
parts_catalog_path: Path,
|
||||
sets_path: Path,
|
||||
) -> List[dict]:
|
||||
"""Construit la liste des planches d'autocollants par set."""
|
||||
rows = read_rows(parts_filtered_path)
|
||||
parts_catalog = load_parts_catalog(parts_catalog_path)
|
||||
sets_lookup = load_sets(sets_path)
|
||||
aggregated = aggregate_stickers(rows, parts_catalog)
|
||||
stickers: List[dict] = []
|
||||
for (set_num, part_num), quantity in aggregated.items():
|
||||
set_row = sets_lookup[set_num]
|
||||
part = parts_catalog[part_num]
|
||||
stickers.append(
|
||||
{
|
||||
"set_num": set_num,
|
||||
"set_id": set_row["set_id"],
|
||||
"year": set_row["year"],
|
||||
"name": set_row["name"],
|
||||
"part_num": part_num,
|
||||
"part_name": part["name"],
|
||||
"quantity": str(quantity),
|
||||
}
|
||||
)
|
||||
stickers.sort(key=lambda r: (int(r["year"]), r["set_num"], r["part_num"]))
|
||||
return stickers
|
||||
|
||||
|
||||
def write_sticker_parts(destination_path: Path, rows: Iterable[dict]) -> None:
|
||||
"""Écrit le CSV des autocollants par set."""
|
||||
ensure_parent_dir(destination_path)
|
||||
fieldnames = ["set_num", "set_id", "year", "name", "part_num", "part_name", "quantity"]
|
||||
with destination_path.open("w", newline="") as csv_file:
|
||||
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
Reference in New Issue
Block a user