Filtering
Prune your stream without breaking the pipeline
`.filter(fn)` only passes items where `fn(x)` is truthy. Can be combined with `count` to parallelize the predicate itself — useful when the test is expensive (network call, ML inference).
python
from olympipe import Pipeline
def is_valid(record: dict) -> bool:
return record.get("score", 0) >= 0.8 and record.get("label") is not None
results = (
Pipeline(raw_records) # 10 000 records
.filter(is_valid) # garde seulement score >= 0.8
.task(enrich, count=4) # enrichissement parallèle
.wait_for_result()
)⚡ Performance
10 000 items, network predicate (2ms/item)
Sequential 20.0s
4 workers 5.3s
8 workers 2.8s
🚀 7.1× faster
How it works
The `FilterPipe` consumes items, evaluates the predicate, and forwards only kept items to the next queue. Filtered items are dropped without stalling the stream.
