Datatoy Logo

Ready-to-use examples

Functional pipelines for your use cases. Copy, adapt, run.

Filter by tag

Bulk image resizing

Resize 10 000 images in parallel using Pillow.

· Feature: Parallel tasks
from pathlib import Path
from PIL import Image
from olympipe import Pipeline

def resize_image(src: Path) -> Path:
    dst = src.parent / "resized" / src.name
    dst.parent.mkdir(exist_ok=True)
    with Image.open(src) as img:
        img.thumbnail((512, 512))
        img.save(dst)
    return dst

results = (
    Pipeline(Path("images").glob("*.jpg"))
    .task(resize_image, count=8)
    .wait_for_result()
)
print(f"{len(results)} images resized")

ML embedding generation

Generate embeddings for 50 000 texts with sentence-transformers.

· Feature: Stateful workers
from sentence_transformers import SentenceTransformer
from olympipe import Pipeline

class EmbedWorker:
    def __init__(self):
        self.model = SentenceTransformer("all-MiniLM-L6-v2")

    def encode(self, batch: list[str]) -> list:
        return self.model.encode(batch, convert_to_numpy=True).tolist()

texts: list[str] = load_texts()   # vos 50 000 textes

embeddings = (
    Pipeline(texts)
    .batch(64)
    .class_task(EmbedWorker, EmbedWorker.encode, count=2)
    .wait_for_result()
)

Parallel web scraping

Download and parse 1 000 HTML pages with httpx.

· Feature: Parallel tasks
import httpx
from bs4 import BeautifulSoup
from olympipe import Pipeline

def fetch_page(url: str) -> dict:
    resp = httpx.get(url, timeout=10)
    soup = BeautifulSoup(resp.text, "html.parser")
    return {
        "url": url,
        "title": soup.title.string if soup.title else "",
        "text": soup.get_text()[:2000],
    }

urls: list[str] = load_urls()

results = (
    Pipeline(urls)
    .task(fetch_page, count=20)    # 20 requêtes simultanées
    .filter(lambda r: len(r["text"]) > 100)
    .wait_for_result()
)

CSV file processing

Read and transform thousands of CSV files in parallel.

· Feature: Parallel tasks
import csv
from pathlib import Path
from olympipe import Pipeline

def process_csv(path: Path) -> list[dict]:
    results = []
    with open(path) as f:
        for row in csv.DictReader(f):
            value = float(row["value"])
            if value > 0:
                results.append({"file": path.name, "value": value})
    return results

all_rows = (
    Pipeline(Path("data").glob("*.csv"))
    .task(process_csv, count=8)
    .wait_for_result()
)

# Aplatir les listes de résultats
flat = [row for rows in all_rows for row in rows]

Full ETL pipeline

Extract, filter, batch-transform and write to database.

· Feature: Batching
import sqlite3
from olympipe import Pipeline

def extract(record_id: int) -> dict:
    return fetch_from_api(record_id)

def transform(batch: list[dict]) -> list[dict]:
    return [
        {**r, "normalized": r["value"] / r["max_value"]}
        for r in batch if r["max_value"] > 0
    ]

def load_to_db(batch: list[dict]) -> int:
    with sqlite3.connect("output.db") as conn:
        conn.executemany(
            "INSERT INTO records VALUES (?, ?)",
            [(r["id"], r["normalized"]) for r in batch],
        )
    return len(batch)

total = (
    Pipeline(range(100_000))
    .task(extract, count=16)
    .filter(lambda r: r.get("active"))
    .batch(256)
    .task(transform)
    .task(load_to_db, count=4)
    .wait_and_reduce(0, lambda acc, n: acc + n)
)
print(f"{total} records loaded")

Parallel external API calls

Call a third-party API to enrich each item of a dataset.

· Feature: Parallel tasks
import httpx
from olympipe import Pipeline

def enrich(item: dict) -> dict:
    resp = httpx.post(
        "https://api.example.com/enrich",
        json={"id": item["id"]},
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=5,
    )
    return {**item, **resp.json()}

enriched = (
    Pipeline(dataset)             # 5 000 items
    .task(enrich, count=32)       # 32 appels simultanés
    .filter(lambda r: "error" not in r)
    .wait_for_result()
)

LLM inference server

Serve a HuggingFace model as an HTTP API with Olympipe.

· Feature: Stateful workers
import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response

class LLMWorker:
    def __init__(self, model_name: str):
        from transformers import pipeline as hf_pipeline
        self.pipe = hf_pipeline(
            "text-generation",
            model=model_name,
            device_map="auto",
        )

    def generate(self, pair: tuple) -> dict:
        conn, data = pair
        output = self.pipe(
            data["prompt"],
            max_new_tokens=data.get("max_new_tokens", 256),
            do_sample=False,
        )
        result = {"response": output[0]["generated_text"]}
        send_json_response(conn, result)
        return result

Pipeline.server(
    [("POST", "/generate", lambda body: body)],
    port=8000,
).class_task(
    LLMWorker,
    LLMWorker.generate,
    ["mistralai/Mistral-7B-Instruct-v0.3"],
    count=2,
).wait_for_completion()

Parallel audio processing

Transcribe audio files with Whisper in parallel.

· Feature: Stateful workers
from pathlib import Path
from olympipe import Pipeline
import whisper

class WhisperWorker:
    def __init__(self, model_size: str):
        self.model = whisper.load_model(model_size)

    def transcribe(self, path: Path) -> dict:
        result = self.model.transcribe(str(path))
        return {"file": path.name, "text": result["text"]}

audio_files = list(Path("audio").glob("*.mp3"))

transcriptions = (
    Pipeline(audio_files)
    .class_task(
        WhisperWorker,
        WhisperWorker.transcribe,
        ["base"],
        count=2,
    )
    .wait_for_result()
)

Conditional routing by type

Process images and texts with distinct pipelines and merge.

· Feature: Split & Gather
from typing import Optional, Tuple
from olympipe import Pipeline

def route(item: dict) -> Tuple[Optional[dict], Optional[dict]]:
    if item["type"] == "image":
        return (item, None)
    return (None, item)

images, texts = Pipeline(mixed_items).split(route, n=2)

results = (
    images
    .task(process_image, count=4)   # pipeline images
    .gather(
        texts
        .task(process_text, count=2) # pipeline textes
    )
    .wait_for_result()
)

ML pipeline with intermediate cache

Cache extracted features to iterate quickly on the model.

· Feature: Disk cache
import tempfile
from olympipe import Pipeline

def extract_features(path: str) -> dict:
    # Calcul lourd : SIFT, ResNet features, etc.
    return compute_features(path)

def classify(features: dict) -> str:
    return my_classifier(features)

cache_dir = "/tmp/feature_cache"

# Première fois : extrait et cache les features
results = (
    Pipeline(file_paths)
    .cached_task(extract_features, cache_dir=cache_dir)
    .uncache()
    .task(classify, count=4)
    .wait_for_result()
)

# Itération sur le classifieur : features relues depuis disque
results_v2 = (
    Pipeline(file_paths)
    .cached_task(extract_features, cache_dir=cache_dir)
    .uncache()
    .task(classify_v2, count=4)    # nouveau classifieur, même features
    .wait_for_result()
)