Getting Started
Installation
PipelineCast requires Python 3.10 or later. Install from PyPI:
pip install pipelinecast
To install with all optional connector dependencies:
pip install pipelinecast[all]
Or pick only the connectors you need:
pip install pipelinecast[kafka,postgres,s3]
Your First Pipeline
A pipeline reads from a Source, applies one or more Transforms, and writes to a Sink. Here is a minimal example that reads JSON lines from a file, filters records, and prints results to stdout:
from pipelinecast import Pipeline, stages
# Create a named pipeline
pipeline = Pipeline("my-first-pipeline")
pipeline.source(
stages.FileSource(path="events.jsonl", format="json")
).transform(
stages.Filter(lambda row: row["status"] == "active"),
stages.Select("id", "name", "email"),
).sink(
stages.StdoutSink(format="json")
)
pipeline.run()
Running the Pipeline
Save the code above as my_pipeline.py and run it:
python my_pipeline.py
You can also use the CLI to run a pipeline defined in a YAML configuration file:
pipelinecast run --config pipeline.yaml
Add --workers 4 to enable parallel processing across multiple threads. PipelineCast handles partitioning and backpressure automatically.
Core Concepts
Every PipelineCast pipeline consists of three stage types:
| Stage Type | Role | Examples |
|---|---|---|
| Source | Reads records from an external system | KafkaSource, FileSource, PostgresCDCSource |
| Transform | Modifies, filters, or enriches records | Filter, Map, JsonParse, Enrich, Aggregate |
| Sink | Writes records to a destination | PostgresSink, S3Sink, KafkaSink, StdoutSink |
Stages are connected through internal bounded queues. When a Sink falls behind, backpressure propagates upstream automatically, slowing the Source instead of dropping records or exhausting memory.
Custom Transform
You can write a custom transform by subclassing Transform:
from pipelinecast import Transform
class NormalizeEmail(Transform):
def process(self, record):
record["email"] = record["email"].strip().lower()
return record
Then use it in your pipeline just like built-in stages:
pipeline.source(...).transform(
stages.JsonParse(),
NormalizeEmail(),
stages.Filter(lambda r: "@" in r["email"]),
).sink(...)
Next Steps
- Learn about YAML config files and environment variables in Configuration
- Explore the full API Reference for Pipeline, Stage, and metrics
- Browse 40+ connectors in the Connectors catalog