Datatoy Logo

Filtering

Prune your stream without breaking the pipeline

`.filter(fn)` only passes items where `fn(x)` is truthy. Can be combined with `count` to parallelize the predicate itself — useful when the test is expensive (network call, ML inference).

python
from olympipe import Pipeline

def is_valid(record: dict) -> bool:
    return record.get("score", 0) >= 0.8 and record.get("label") is not None

results = (
    Pipeline(raw_records)          # 10 000 records
    .filter(is_valid)              # garde seulement score >= 0.8
    .task(enrich, count=4)         # enrichissement parallèle
    .wait_for_result()
)

Performance

10 000 items, network predicate (2ms/item)

Sequential 20.0s
4 workers 5.3s
8 workers 2.8s
🚀 7.1× faster

How it works

The `FilterPipe` consumes items, evaluates the predicate, and forwards only kept items to the next queue. Filtered items are dropped without stalling the stream.