1
etude_lego_jurassic_world/tests/test_downloader.py

94 lines
2.9 KiB
Python

"""Tests du module de téléchargement Rebrickable."""
import gzip
from pathlib import Path
import responses
from lib.rebrickable.downloader import (
build_rebrickable_url,
download_rebrickable_file,
download_rebrickable_files,
)
def test_build_rebrickable_url() -> None:
"""Construit l'URL complète vers Rebrickable."""
assert build_rebrickable_url("themes.csv.gz") == (
"https://cdn.rebrickable.com/media/downloads/themes.csv.gz"
)
@responses.activate
def test_download_rebrickable_file(tmp_path: Path) -> None:
"""Télécharge, enregistre et décompresse le fichier compressé."""
file_name = "themes.csv.gz"
uncompressed_content = b"compressed-data"
compressed_body = gzip.compress(uncompressed_content)
responses.add(
responses.GET,
build_rebrickable_url(file_name),
body=compressed_body,
status=200,
)
target_path = download_rebrickable_file(file_name, tmp_path)
assert target_path == tmp_path / "themes.csv"
assert target_path.read_bytes() == uncompressed_content
assert not (tmp_path / file_name).exists()
@responses.activate
def test_download_skips_when_cache_is_fresh(tmp_path: Path) -> None:
"""Ne retélécharge pas un fichier récent et conserve le contenu."""
file_name = "themes.csv.gz"
cached_path = tmp_path / "themes.csv"
cached_path.write_bytes(b"cached")
target_path = download_rebrickable_file(file_name, tmp_path)
assert target_path == cached_path
assert target_path.read_bytes() == b"cached"
assert not (tmp_path / file_name).exists()
assert len(responses.calls) == 0
@responses.activate
def test_download_multiple_rebrickable_files(tmp_path: Path) -> None:
"""Télécharge plusieurs fichiers compressés et les décompresse."""
file_names = [
"inventories.csv.gz",
"inventory_parts.csv.gz",
"parts.csv.gz",
"colors.csv.gz",
"part_categories.csv.gz",
]
compressed_bodies = {}
for file_name in file_names:
uncompressed_content = file_name.encode()
compressed_body = gzip.compress(uncompressed_content)
compressed_bodies[file_name] = compressed_body
responses.add(
responses.GET,
build_rebrickable_url(file_name),
body=compressed_body,
status=200,
)
downloaded_paths = download_rebrickable_files(file_names, tmp_path)
assert downloaded_paths == [
tmp_path / "inventories.csv",
tmp_path / "inventory_parts.csv",
tmp_path / "parts.csv",
tmp_path / "colors.csv",
tmp_path / "part_categories.csv",
]
assert len(responses.calls) == len(file_names)
for file_name in file_names:
target_path = tmp_path / file_name
decompressed_path = target_path.with_suffix("")
assert decompressed_path.read_bytes() == file_name.encode()
assert not target_path.exists()