Getting Started

Installation

PipelineCast requires Python 3.10 or later. Install from PyPI:

pip install pipelinecast

To install with all optional connector dependencies:

pip install pipelinecast[all]

Or pick only the connectors you need:

pip install pipelinecast[kafka,postgres,s3]

Your First Pipeline

A pipeline reads from a Source, applies one or more Transforms, and writes to a Sink. Here is a minimal example that reads JSON lines from a file, filters records, and prints results to stdout:

from pipelinecast import Pipeline, stages

# Create a named pipeline
pipeline = Pipeline("my-first-pipeline")

pipeline.source(
    stages.FileSource(path="events.jsonl", format="json")
).transform(
    stages.Filter(lambda row: row["status"] == "active"),
    stages.Select("id", "name", "email"),
).sink(
    stages.StdoutSink(format="json")
)

pipeline.run()

Running the Pipeline

Save the code above as my_pipeline.py and run it:

python my_pipeline.py

You can also use the CLI to run a pipeline defined in a YAML configuration file:

pipelinecast run --config pipeline.yaml

Add --workers 4 to enable parallel processing across multiple threads. PipelineCast handles partitioning and backpressure automatically.

Core Concepts

Every PipelineCast pipeline consists of three stage types:

Stage Type Role Examples
Source Reads records from an external system KafkaSource, FileSource, PostgresCDCSource
Transform Modifies, filters, or enriches records Filter, Map, JsonParse, Enrich, Aggregate
Sink Writes records to a destination PostgresSink, S3Sink, KafkaSink, StdoutSink

Stages are connected through internal bounded queues. When a Sink falls behind, backpressure propagates upstream automatically, slowing the Source instead of dropping records or exhausting memory.

Custom Transform

You can write a custom transform by subclassing Transform:

from pipelinecast import Transform

class NormalizeEmail(Transform):
    def process(self, record):
        record["email"] = record["email"].strip().lower()
        return record

Then use it in your pipeline just like built-in stages:

pipeline.source(...).transform(
    stages.JsonParse(),
    NormalizeEmail(),
    stages.Filter(lambda r: "@" in r["email"]),
).sink(...)

Next Steps