Declarative stage composition, automatic backpressure, and 40+ connectors. Stream, transform, and route data with pure Python.
Define pipelines as composable stages. Chain sources, transforms, and sinks with a fluent API. No boilerplate.
Automatic flow control across stages. Slow consumers don't crash producers. Configurable buffer policies per stage.
Kafka, Redis Streams, PostgreSQL CDC, S3, Parquet, webhooks, and more. Write custom connectors in under 20 lines.
from pipelinecast import Pipeline, stages
# Define a real-time pipeline
pipeline = Pipeline("user-events")
pipeline.source(
stages.KafkaSource(
brokers="localhost:9092",
topic="raw-events",
group="pipeline-v1",
)
).transform(
stages.JsonParse(),
stages.Filter(lambda e: e["type"] == "click"),
stages.Enrich(geo_lookup=True),
).sink(
stages.PostgresSink(
dsn="postgresql://localhost/analytics",
table="click_events",
batch_size=500,
)
)
# Run with automatic backpressure
pipeline.run(workers=4)