Exemples prêts à l'emploi
Des pipelines fonctionnels pour vos use cases. Copiez, adaptez, lancez.
Filtrer par tag
Redimensionnement d'images en masse
Redimensionner 10 000 images en parallèle avec Pillow.
from pathlib import Path
from PIL import Image
from olympipe import Pipeline
def resize_image(src: Path) -> Path:
dst = src.parent / "resized" / src.name
dst.parent.mkdir(exist_ok=True)
with Image.open(src) as img:
img.thumbnail((512, 512))
img.save(dst)
return dst
results = (
Pipeline(Path("images").glob("*.jpg"))
.task(resize_image, count=8)
.wait_for_result()
)
print(f"{len(results)} images resized")Génération d'embeddings ML
Générer des embeddings pour 50 000 textes avec sentence-transformers.
from sentence_transformers import SentenceTransformer
from olympipe import Pipeline
class EmbedWorker:
def __init__(self):
self.model = SentenceTransformer("all-MiniLM-L6-v2")
def encode(self, batch: list[str]) -> list:
return self.model.encode(batch, convert_to_numpy=True).tolist()
texts: list[str] = load_texts() # vos 50 000 textes
embeddings = (
Pipeline(texts)
.batch(64)
.class_task(EmbedWorker, EmbedWorker.encode, count=2)
.wait_for_result()
)Scraping web parallèle
Télécharger et parser 1 000 pages HTML avec httpx.
import httpx
from bs4 import BeautifulSoup
from olympipe import Pipeline
def fetch_page(url: str) -> dict:
resp = httpx.get(url, timeout=10)
soup = BeautifulSoup(resp.text, "html.parser")
return {
"url": url,
"title": soup.title.string if soup.title else "",
"text": soup.get_text()[:2000],
}
urls: list[str] = load_urls()
results = (
Pipeline(urls)
.task(fetch_page, count=20) # 20 requêtes simultanées
.filter(lambda r: len(r["text"]) > 100)
.wait_for_result()
)Traitement de fichiers CSV
Lire et transformer des milliers de fichiers CSV en parallèle.
import csv
from pathlib import Path
from olympipe import Pipeline
def process_csv(path: Path) -> list[dict]:
results = []
with open(path) as f:
for row in csv.DictReader(f):
value = float(row["value"])
if value > 0:
results.append({"file": path.name, "value": value})
return results
all_rows = (
Pipeline(Path("data").glob("*.csv"))
.task(process_csv, count=8)
.wait_for_result()
)
# Aplatir les listes de résultats
flat = [row for rows in all_rows for row in rows]Pipeline ETL complet
Extraire, filtrer, transformer par batch et écrire en base.
import sqlite3
from olympipe import Pipeline
def extract(record_id: int) -> dict:
return fetch_from_api(record_id)
def transform(batch: list[dict]) -> list[dict]:
return [
{**r, "normalized": r["value"] / r["max_value"]}
for r in batch if r["max_value"] > 0
]
def load_to_db(batch: list[dict]) -> int:
with sqlite3.connect("output.db") as conn:
conn.executemany(
"INSERT INTO records VALUES (?, ?)",
[(r["id"], r["normalized"]) for r in batch],
)
return len(batch)
total = (
Pipeline(range(100_000))
.task(extract, count=16)
.filter(lambda r: r.get("active"))
.batch(256)
.task(transform)
.task(load_to_db, count=4)
.wait_and_reduce(0, lambda acc, n: acc + n)
)
print(f"{total} records loaded")Appels API externes en parallèle
Appeler une API tierce pour enrichir chaque item d'un dataset.
import httpx
from olympipe import Pipeline
def enrich(item: dict) -> dict:
resp = httpx.post(
"https://api.example.com/enrich",
json={"id": item["id"]},
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=5,
)
return {**item, **resp.json()}
enriched = (
Pipeline(dataset) # 5 000 items
.task(enrich, count=32) # 32 appels simultanés
.filter(lambda r: "error" not in r)
.wait_for_result()
)Serveur d'inférence LLM
Servir un modèle HuggingFace comme API HTTP avec Olympipe.
import socket
from olympipe import Pipeline
from olympipe.helpers.server import send_json_response
class LLMWorker:
def __init__(self, model_name: str):
from transformers import pipeline as hf_pipeline
self.pipe = hf_pipeline(
"text-generation",
model=model_name,
device_map="auto",
)
def generate(self, pair: tuple) -> dict:
conn, data = pair
output = self.pipe(
data["prompt"],
max_new_tokens=data.get("max_new_tokens", 256),
do_sample=False,
)
result = {"response": output[0]["generated_text"]}
send_json_response(conn, result)
return result
Pipeline.server(
[("POST", "/generate", lambda body: body)],
port=8000,
).class_task(
LLMWorker,
LLMWorker.generate,
["mistralai/Mistral-7B-Instruct-v0.3"],
count=2,
).wait_for_completion()Traitement audio en parallèle
Transcrire des fichiers audio avec Whisper en parallèle.
from pathlib import Path
from olympipe import Pipeline
import whisper
class WhisperWorker:
def __init__(self, model_size: str):
self.model = whisper.load_model(model_size)
def transcribe(self, path: Path) -> dict:
result = self.model.transcribe(str(path))
return {"file": path.name, "text": result["text"]}
audio_files = list(Path("audio").glob("*.mp3"))
transcriptions = (
Pipeline(audio_files)
.class_task(
WhisperWorker,
WhisperWorker.transcribe,
["base"],
count=2,
)
.wait_for_result()
)Routage conditionnel par type
Traiter images et textes avec des pipelines distincts et fusionner.
from typing import Optional, Tuple
from olympipe import Pipeline
def route(item: dict) -> Tuple[Optional[dict], Optional[dict]]:
if item["type"] == "image":
return (item, None)
return (None, item)
images, texts = Pipeline(mixed_items).split(route, n=2)
results = (
images
.task(process_image, count=4) # pipeline images
.gather(
texts
.task(process_text, count=2) # pipeline textes
)
.wait_for_result()
)Pipeline ML avec cache intermédiaire
Mettre en cache les features extraites pour itérer rapidement sur le modèle.
import tempfile
from olympipe import Pipeline
def extract_features(path: str) -> dict:
# Calcul lourd : SIFT, ResNet features, etc.
return compute_features(path)
def classify(features: dict) -> str:
return my_classifier(features)
cache_dir = "/tmp/feature_cache"
# Première fois : extrait et cache les features
results = (
Pipeline(file_paths)
.cached_task(extract_features, cache_dir=cache_dir)
.uncache()
.task(classify, count=4)
.wait_for_result()
)
# Itération sur le classifieur : features relues depuis disque
results_v2 = (
Pipeline(file_paths)
.cached_task(extract_features, cache_dir=cache_dir)
.uncache()
.task(classify_v2, count=4) # nouveau classifieur, même features
.wait_for_result()
)