Ready-to-use examples
Functional pipelines for your use cases. Copy, adapt, run.
Filter by tag
from pathlib import Path
from PIL import Image
from olympipe import Pipeline
def resize_image(src: Path) -> Path:
dst = src.parent / "resized" / src.name
dst.parent.mkdir(exist_ok=True)
with Image.open(src) as img:
img.thumbnail((512, 512))
img.save(dst)
return dst
results = (
Pipeline(Path("images").glob("*.jpg"))
.task(resize_image, count=8)
.wait_for_result()
)
print(f"{len(results)} images resized")ML embedding generation
Generate embeddings for 50 000 texts with sentence-transformers.
from sentence_transformers import SentenceTransformer
from olympipe import Pipeline
class EmbedWorker:
def __init__(self):
self.model = SentenceTransformer("all-MiniLM-L6-v2")
def encode(self, batch: list[str]) -> list:
return self.model.encode(batch, convert_to_numpy=True).tolist()
texts: list[str] = load_texts() # vos 50 000 textes
embeddings = (
Pipeline(texts)
.batch(64)
.class_task(EmbedWorker, EmbedWorker.encode, count=2)
.wait_for_result()
)import httpx
from bs4 import BeautifulSoup
from olympipe import Pipeline
def fetch_page(url: str) -> dict:
resp = httpx.get(url, timeout=10)
soup = BeautifulSoup(resp.text, "html.parser")
return {
"url": url,
"title": soup.title.string if soup.title else "",
"text": soup.get_text()[:2000],
}
urls: list[str] = load_urls()
results = (
Pipeline(urls)
.task(fetch_page, count=20) # 20 requêtes simultanées
.filter(lambda r: len(r["text"]) > 100)
.wait_for_result()
)CSV file processing
Read and transform thousands of CSV files in parallel.
import csv
from pathlib import Path
from olympipe import Pipeline
def process_csv(path: Path) -> list[dict]:
results = []
with open(path) as f:
for row in csv.DictReader(f):
value = float(row["value"])
if value > 0:
results.append({"file": path.name, "value": value})
return results
all_rows = (
Pipeline(Path("data").glob("*.csv"))
.task(process_csv, count=8)
.wait_for_result()
)
# Aplatir les listes de résultats
flat = [row for rows in all_rows for row in rows]import sqlite3
from olympipe import Pipeline
def extract(record_id: int) -> dict:
return fetch_from_api(record_id)
def transform(batch: list[dict]) -> list[dict]:
return [
{**r, "normalized": r["value"] / r["max_value"]}
for r in batch if r["max_value"] > 0
]
def load_to_db(batch: list[dict]) -> int:
with sqlite3.connect("output.db") as conn:
conn.executemany(
"INSERT INTO records VALUES (?, ?)",
[(r["id"], r["normalized"]) for r in batch],
)
return len(batch)
total = (
Pipeline(range(100_000))
.task(extract, count=16)
.filter(lambda r: r.get("active"))
.batch(256)
.task(transform)
.task(load_to_db, count=4)
.wait_and_reduce(0, lambda acc, n: acc + n)
)
print(f"{total} records loaded")Parallel external API calls
Call a third-party API to enrich each item of a dataset.
import httpx
from olympipe import Pipeline
def enrich(item: dict) -> dict:
resp = httpx.post(
"https://api.example.com/enrich",
json={"id": item["id"]},
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=5,
)
return {**item, **resp.json()}
enriched = (
Pipeline(dataset) # 5 000 items
.task(enrich, count=32) # 32 appels simultanés
.filter(lambda r: "error" not in r)
.wait_for_result()
)LLM inference server
Serve a HuggingFace model as an HTTP API with Olympipe.
import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response
class LLMWorker:
def __init__(self, model_name: str):
from transformers import pipeline as hf_pipeline
self.pipe = hf_pipeline(
"text-generation",
model=model_name,
device_map="auto",
)
def generate(self, pair: tuple) -> dict:
conn, data = pair
output = self.pipe(
data["prompt"],
max_new_tokens=data.get("max_new_tokens", 256),
do_sample=False,
)
result = {"response": output[0]["generated_text"]}
send_json_response(conn, result)
return result
Pipeline.server(
[("POST", "/generate", lambda body: body)],
port=8000,
).class_task(
LLMWorker,
LLMWorker.generate,
["mistralai/Mistral-7B-Instruct-v0.3"],
count=2,
).wait_for_completion()Parallel audio processing
Transcribe audio files with Whisper in parallel.
from pathlib import Path
from olympipe import Pipeline
import whisper
class WhisperWorker:
def __init__(self, model_size: str):
self.model = whisper.load_model(model_size)
def transcribe(self, path: Path) -> dict:
result = self.model.transcribe(str(path))
return {"file": path.name, "text": result["text"]}
audio_files = list(Path("audio").glob("*.mp3"))
transcriptions = (
Pipeline(audio_files)
.class_task(
WhisperWorker,
WhisperWorker.transcribe,
["base"],
count=2,
)
.wait_for_result()
)Conditional routing by type
Process images and texts with distinct pipelines and merge.
from typing import Optional, Tuple
from olympipe import Pipeline
def route(item: dict) -> Tuple[Optional[dict], Optional[dict]]:
if item["type"] == "image":
return (item, None)
return (None, item)
images, texts = Pipeline(mixed_items).split(route, n=2)
results = (
images
.task(process_image, count=4) # pipeline images
.gather(
texts
.task(process_text, count=2) # pipeline textes
)
.wait_for_result()
)ML pipeline with intermediate cache
Cache extracted features to iterate quickly on the model.
import tempfile
from olympipe import Pipeline
def extract_features(path: str) -> dict:
# Calcul lourd : SIFT, ResNet features, etc.
return compute_features(path)
def classify(features: dict) -> str:
return my_classifier(features)
cache_dir = "/tmp/feature_cache"
# Première fois : extrait et cache les features
results = (
Pipeline(file_paths)
.cached_task(extract_features, cache_dir=cache_dir)
.uncache()
.task(classify, count=4)
.wait_for_result()
)
# Itération sur le classifieur : features relues depuis disque
results_v2 = (
Pipeline(file_paths)
.cached_task(extract_features, cache_dir=cache_dir)
.uncache()
.task(classify_v2, count=4) # nouveau classifieur, même features
.wait_for_result()
)