1
2025-11-18 09:01:34 +01:00

112 lines
3.3 KiB
Python

"""Détection d'événements météorologiques et extraction de segments alignés."""
from __future__ import annotations
from typing import Sequence
import numpy as np
import pandas as pd
from .core import _ensure_datetime_index, _infer_time_step
__all__ = ['detect_threshold_events', 'build_event_aligned_segments']
def detect_threshold_events(
series: pd.Series,
*,
threshold: float,
min_duration: pd.Timedelta,
min_gap: pd.Timedelta,
) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
"""
Détecte des événements où `series > threshold` (après remplissage des NaN
par False) durant au moins `min_duration`. Les événements séparés d'un
intervalle < min_gap sont fusionnés.
"""
if not isinstance(series.index, pd.DatetimeIndex):
raise TypeError("series doit être indexée par le temps.")
mask = (series > threshold).fillna(False)
if not mask.any():
return []
groups = (mask != mask.shift()).cumsum()
time_step = _infer_time_step(series.index)
raw_events: list[tuple[pd.Timestamp, pd.Timestamp]] = []
for group_id, group_mask in mask.groupby(groups):
if not group_mask.iloc[0]:
continue
start = group_mask.index[0]
end = group_mask.index[-1] + time_step
duration = end - start
if duration >= min_duration:
raw_events.append((start, end))
if not raw_events:
return []
merged: list[tuple[pd.Timestamp, pd.Timestamp]] = []
for start, end in raw_events:
if not merged:
merged.append((start, end))
continue
prev_start, prev_end = merged[-1]
if start - prev_end < min_gap:
merged[-1] = (prev_start, max(prev_end, end))
else:
merged.append((start, end))
return merged
def build_event_aligned_segments(
df: pd.DataFrame,
events: Sequence[tuple[pd.Timestamp, pd.Timestamp]],
columns: Sequence[str],
*,
window_before_minutes: int,
window_after_minutes: int,
resample_minutes: int = 1,
) -> pd.DataFrame:
"""
Extrait, pour chaque événement, les séries centrées sur son début et
retourne un DataFrame MultiIndex (event_id, offset_minutes).
"""
if not events:
return pd.DataFrame(columns=columns)
index = _ensure_datetime_index(df)
data = df[columns].sort_index()
freq = pd.Timedelta(minutes=resample_minutes)
if resample_minutes > 1:
data = data.resample(freq).mean()
before = pd.Timedelta(minutes=window_before_minutes)
after = pd.Timedelta(minutes=window_after_minutes)
segments: list[pd.DataFrame] = []
for event_id, (start, _end) in enumerate(events):
window_start = start - before
window_end = start + after
window_index = pd.date_range(window_start, window_end, freq=freq)
segment = data.reindex(window_index)
if segment.empty:
continue
offsets = ((segment.index - start) / pd.Timedelta(minutes=1)).astype(float)
multi_index = pd.MultiIndex.from_arrays(
[np.full(len(segment), event_id), offsets],
names=["event_id", "offset_minutes"],
)
segment.index = multi_index
segments.append(segment)
if not segments:
return pd.DataFrame(columns=columns)
aligned = pd.concat(segments)
return aligned