Filtrage
Élaguez votre flux sans rompre le pipeline
`.filter(fn)` ne laisse passer que les items pour lesquels `fn(x)` est vrai. Peut être combiné avec `count` pour paralléliser le prédicat lui-même — utile si le test est coûteux (appel réseau, inférence ML).
python
from olympipe import Pipeline
def is_valid(record: dict) -> bool:
return record.get("score", 0) >= 0.8 and record.get("label") is not None
results = (
Pipeline(raw_records) # 10 000 records
.filter(is_valid) # garde seulement score >= 0.8
.task(enrich, count=4) # enrichissement parallèle
.wait_for_result()
)⚡ Performance
10 000 items, prédicat réseau (2ms/item)
Séquentiel 20.0s
4 workers 5.3s
8 workers 2.8s
🚀 7.1× plus rapide
Comment ça fonctionne
Le `FilterPipe` consomme les items, évalue le prédicat et envoie uniquement les items retenus dans la file suivante. Les items filtrés sont abandonnés sans bloquer le flux.
