150 lines
6.3 KiB
Python
150 lines
6.3 KiB
Python
"""Construction d'un inventaire détaillé des pièces par set."""
|
|
|
|
import csv
|
|
from pathlib import Path
|
|
from typing import Dict, List
|
|
|
|
from lib.filesystem import ensure_parent_dir
|
|
|
|
|
|
def normalize_boolean(raw_value: str) -> str:
|
|
"""Normalise une valeur booléenne en chaîne lowercase."""
|
|
return raw_value.lower()
|
|
|
|
|
|
def select_latest_inventories(inventories_path: Path) -> Dict[str, dict]:
|
|
"""Retient pour chaque set l'inventaire avec la version la plus élevée."""
|
|
latest_inventories: Dict[str, dict] = {}
|
|
with inventories_path.open() as inventories_file:
|
|
reader = csv.DictReader(inventories_file)
|
|
for row in reader:
|
|
current = latest_inventories.get(row["set_num"])
|
|
if current is None or int(row["version"]) > int(current["version"]):
|
|
latest_inventories[row["set_num"]] = {"id": row["id"], "version": row["version"]}
|
|
return latest_inventories
|
|
|
|
|
|
def build_color_lookup(colors_path: Path) -> Dict[str, dict]:
|
|
"""Construit un index des couleurs par identifiant."""
|
|
colors: Dict[str, dict] = {}
|
|
with colors_path.open() as colors_file:
|
|
reader = csv.DictReader(colors_file)
|
|
for row in reader:
|
|
colors[row["id"]] = {
|
|
"rgb": row["rgb"],
|
|
"is_translucent": normalize_boolean(row["is_trans"]),
|
|
}
|
|
return colors
|
|
|
|
|
|
def index_inventory_parts_by_inventory(inventory_parts_path: Path) -> Dict[str, List[dict]]:
|
|
"""Indexe les lignes d'inventaire par identifiant d'inventaire."""
|
|
parts_by_inventory: Dict[str, List[dict]] = {}
|
|
with inventory_parts_path.open() as parts_file:
|
|
reader = csv.DictReader(parts_file)
|
|
for row in reader:
|
|
inventory_id = row["inventory_id"]
|
|
if inventory_id not in parts_by_inventory:
|
|
parts_by_inventory[inventory_id] = []
|
|
parts_by_inventory[inventory_id].append(row)
|
|
return parts_by_inventory
|
|
|
|
|
|
def index_inventory_minifigs_by_inventory(inventory_minifigs_path: Path) -> Dict[str, List[dict]]:
|
|
"""Indexe les minifigs par inventaire."""
|
|
minifigs_by_inventory: Dict[str, List[dict]] = {}
|
|
with inventory_minifigs_path.open() as minifigs_file:
|
|
reader = csv.DictReader(minifigs_file)
|
|
for row in reader:
|
|
inventory_id = row["inventory_id"]
|
|
if inventory_id not in minifigs_by_inventory:
|
|
minifigs_by_inventory[inventory_id] = []
|
|
minifigs_by_inventory[inventory_id].append(row)
|
|
return minifigs_by_inventory
|
|
|
|
|
|
def build_minifig_lookup(minifigs_path: Path) -> Dict[str, dict]:
|
|
"""Construit un index des minifigs avec leur nombre de pièces."""
|
|
minifigs: Dict[str, dict] = {}
|
|
with minifigs_path.open() as minifigs_file:
|
|
reader = csv.DictReader(minifigs_file)
|
|
for row in reader:
|
|
minifigs[row["fig_num"]] = row
|
|
return minifigs
|
|
|
|
|
|
def write_parts_filtered(
|
|
sets_path: Path,
|
|
inventories_path: Path,
|
|
inventory_parts_path: Path,
|
|
colors_path: Path,
|
|
inventory_minifigs_path: Path,
|
|
minifigs_path: Path,
|
|
destination_path: Path,
|
|
) -> None:
|
|
"""Assemble un CSV agrégé listant les pièces par set et par couleur."""
|
|
latest_inventories = select_latest_inventories(inventories_path)
|
|
parts_by_inventory = index_inventory_parts_by_inventory(inventory_parts_path)
|
|
minifigs_by_inventory = index_inventory_minifigs_by_inventory(inventory_minifigs_path)
|
|
minifigs = build_minifig_lookup(minifigs_path)
|
|
colors = build_color_lookup(colors_path)
|
|
ensure_parent_dir(destination_path)
|
|
with sets_path.open() as sets_file, destination_path.open("w", newline="") as target_file:
|
|
sets_reader = csv.DictReader(sets_file)
|
|
fieldnames = [
|
|
"part_num",
|
|
"color_rgb",
|
|
"is_translucent",
|
|
"set_num",
|
|
"set_id",
|
|
"year",
|
|
"quantity_in_set",
|
|
"is_spare",
|
|
"is_minifig_part",
|
|
]
|
|
writer = csv.DictWriter(target_file, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
for set_row in sets_reader:
|
|
inventory = latest_inventories[set_row["set_num"]]
|
|
inventory_parts = parts_by_inventory[inventory["id"]]
|
|
inventory_total_non_spare = sum(
|
|
int(part_row["quantity"])
|
|
for part_row in inventory_parts
|
|
if normalize_boolean(part_row["is_spare"]) == "false"
|
|
)
|
|
expected_parts = int(set_row["num_parts"])
|
|
for part_row in inventory_parts:
|
|
color = colors[part_row["color_id"]]
|
|
writer.writerow(
|
|
{
|
|
"part_num": part_row["part_num"],
|
|
"color_rgb": color["rgb"],
|
|
"is_translucent": color["is_translucent"],
|
|
"set_num": set_row["set_num"],
|
|
"set_id": set_row["set_id"],
|
|
"year": set_row["year"],
|
|
"quantity_in_set": part_row["quantity"],
|
|
"is_spare": normalize_boolean(part_row["is_spare"]),
|
|
"is_minifig_part": "false",
|
|
}
|
|
)
|
|
if inventory_total_non_spare < expected_parts:
|
|
for minifig_row in minifigs_by_inventory.get(inventory["id"], []):
|
|
minifig_inventory = latest_inventories[minifig_row["fig_num"]]
|
|
minifig_parts = parts_by_inventory[minifig_inventory["id"]]
|
|
for part_row in minifig_parts:
|
|
color = colors[part_row["color_id"]]
|
|
writer.writerow(
|
|
{
|
|
"part_num": part_row["part_num"],
|
|
"color_rgb": color["rgb"],
|
|
"is_translucent": color["is_translucent"],
|
|
"set_num": set_row["set_num"],
|
|
"set_id": set_row["set_id"],
|
|
"year": set_row["year"],
|
|
"quantity_in_set": str(int(part_row["quantity"]) * int(minifig_row["quantity"])),
|
|
"is_spare": normalize_boolean(part_row["is_spare"]),
|
|
"is_minifig_part": "true",
|
|
}
|
|
)
|