Datatoy Logo

Split & Gather

Branch your pipeline into independent streams, merge them back

`.split(fn, n=2)` routes each item to 0-N branches based on `fn`'s return value. `.gather(*pipes)` merges N pipelines into one stream. Ideal for processing different item categories with distinct logic.

python
from typing import Optional, Tuple
from olympipe import Pipeline

def route_by_type(item: dict) -> Tuple[Optional[dict], Optional[dict]]:
    if item["type"] == "image":
        return (item, None)   # → branche images
    return (None, item)       # → branche textes

images, texts = Pipeline(mixed_dataset).split(route_by_type, n=2)

results = (
    images.task(process_image, count=4)   # pipeline images
          .gather(texts.task(process_text, count=2))  # + pipeline textes
          .wait_for_result()
)

Performance

5 000 mixed items, differentiated processing

Sequential unified 15.0s
Split 2 branches 4.0s
🚀 3.8× faster

How it works

Each branch is an independent `Pipeline` with its own queue. `None` in the return value means "don't send to this branch". `gather` creates a `GatherPipe` that consumes N queues in round-robin.

Related examples