You've already forked etude_lego_jurassic_world
Création d'un fichier intermédiaire pour les statistiques sur les couleurs
This commit is contained in:
79
lib/rebrickable/colors_by_set.py
Normal file
79
lib/rebrickable/colors_by_set.py
Normal file
@@ -0,0 +1,79 @@
|
||||
"""Agrégation des couleurs utilisées par set."""
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Tuple
|
||||
|
||||
from lib.filesystem import ensure_parent_dir
|
||||
|
||||
|
||||
def load_parts(parts_path: Path) -> List[dict]:
|
||||
"""Charge le fichier parts_filtered pour agrégation."""
|
||||
with parts_path.open() as parts_file:
|
||||
reader = csv.DictReader(parts_file)
|
||||
return list(reader)
|
||||
|
||||
|
||||
def build_colors_lookup(colors_path: Path) -> Dict[Tuple[str, str], str]:
|
||||
"""Construit un index (rgb, is_translucent) -> nom de couleur."""
|
||||
colors: Dict[Tuple[str, str], str] = {}
|
||||
with colors_path.open() as colors_file:
|
||||
reader = csv.DictReader(colors_file)
|
||||
for row in reader:
|
||||
colors[(row["rgb"], row["is_trans"].lower())] = row["name"]
|
||||
return colors
|
||||
|
||||
|
||||
def aggregate_colors_by_set(parts: Iterable[dict], colors_lookup: Dict[Tuple[str, str], str]) -> List[dict]:
|
||||
"""Agrège les quantités par set et par couleur."""
|
||||
totals: Dict[Tuple[str, str, str, str, str], dict] = {}
|
||||
for row in parts:
|
||||
key = (row["set_num"], row["set_id"], row["year"], row["color_rgb"], row["is_translucent"])
|
||||
existing = totals.get(key)
|
||||
if existing is None:
|
||||
totals[key] = {
|
||||
"set_num": row["set_num"],
|
||||
"set_id": row["set_id"],
|
||||
"year": row["year"],
|
||||
"color_rgb": row["color_rgb"],
|
||||
"is_translucent": row["is_translucent"],
|
||||
"color_name": colors_lookup[(row["color_rgb"], row["is_translucent"])],
|
||||
"quantity_total": 0,
|
||||
"quantity_non_spare": 0,
|
||||
"quantity_minifig": 0,
|
||||
"quantity_non_minifig": 0,
|
||||
}
|
||||
existing = totals[key]
|
||||
quantity = int(row["quantity_in_set"])
|
||||
existing["quantity_total"] += quantity
|
||||
if row["is_spare"] == "false":
|
||||
existing["quantity_non_spare"] += quantity
|
||||
if row["is_minifig_part"] == "true":
|
||||
existing["quantity_minifig"] += quantity
|
||||
else:
|
||||
existing["quantity_non_minifig"] += quantity
|
||||
aggregated = list(totals.values())
|
||||
aggregated.sort(key=lambda row: (row["set_num"], row["color_name"], row["is_translucent"]))
|
||||
return aggregated
|
||||
|
||||
|
||||
def write_colors_by_set(destination_path: Path, rows: Iterable[dict]) -> None:
|
||||
"""Sérialise l'agrégat set × couleur dans un CSV dédié."""
|
||||
ensure_parent_dir(destination_path)
|
||||
with destination_path.open("w", newline="") as csv_file:
|
||||
fieldnames = [
|
||||
"set_num",
|
||||
"set_id",
|
||||
"year",
|
||||
"color_rgb",
|
||||
"is_translucent",
|
||||
"color_name",
|
||||
"quantity_total",
|
||||
"quantity_non_spare",
|
||||
"quantity_minifig",
|
||||
"quantity_non_minifig",
|
||||
]
|
||||
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
Reference in New Issue
Block a user