API Reference

PipelineCast is built around a small set of composable types: Pipeline, Source, Transform, and Sink. All stage types implement the Stage protocol.

Pipeline

class Pipeline(name: str, *, workers: int = 1, config: dict | None = None)

The top-level object that wires together a Source, a chain of Transforms, and a Sink. A pipeline manages the lifecycle of all its stages and handles graceful shutdown on SIGINT/SIGTERM.

Method Returns Description
pipeline.source(src, **opts) Pipeline Set the pipeline source. Accepts buffer overrides as keyword arguments.
pipeline.transform(*stages) Pipeline Append one or more transforms to the processing chain.
pipeline.sink(snk, **opts) Pipeline Set the pipeline sink. Returns the pipeline for final configuration.
pipeline.run(*, workers=None) None Start the pipeline. Blocks until stopped or an unrecoverable error occurs.
pipeline.stop(timeout=30.0) None Gracefully stop all stages. Flushes pending records before shutdown.
pipeline.metrics() PipelineMetrics Return current throughput, buffer usage, and error counts per stage.

Stage Protocol

All stages implement the Stage protocol. You rarely need to implement this directly; use the specialized base classes instead.

from typing import Protocol, runtime_checkable

@runtime_checkable
class Stage(Protocol):
    def open(self) -> None:
        """Called once when the pipeline starts."""
        ...

    def close(self) -> None:
        """Called once during graceful shutdown."""
        ...

    def health(self) -> bool:
        """Return True if the stage is operational."""
        ...

Source

A Source produces records. It must implement poll() which returns an iterable of records or None when exhausted.

from pipelinecast import Source

class MySource(Source):
    def open(self):
        self.conn = create_connection()

    def poll(self, batch_size: int = 100) -> list[dict] | None:
        # Return a list of records, or None to signal end-of-stream
        rows = self.conn.fetch(batch_size)
        return rows if rows else None

    def close(self):
        self.conn.close()

Transform

A Transform receives a record and returns a modified record, None to drop it, or a list of records for fan-out.

from pipelinecast import Transform

class SplitLines(Transform):
    def process(self, record: dict) -> list[dict]:
        # Fan-out: one record per line
        return [
            {**record, "line": line}
            for line in record["body"].splitlines()
        ]
Return Value Behavior
dict Forward one record downstream
None Drop the record (filtered out)
list[dict] Fan-out: forward multiple records downstream

Sink

A Sink consumes records. It receives batches for efficiency and must implement write().

from pipelinecast import Sink

class HttpSink(Sink):
    def __init__(self, url: str, batch_size: int = 50):
        self.url = url
        self.batch_size = batch_size

    def write(self, records: list[dict]) -> None:
        import httpx
        httpx.post(self.url, json=records, timeout=10)

PipelineMetrics

The metrics() method returns a dataclass with current pipeline statistics:

Field Type Description
records_in int Total records read from source
records_out int Total records written to sink
records_dropped int Records filtered or dropped by transforms
errors int Total processing errors across all stages
buffer_usage dict[str, float] Buffer fill ratio per stage (0.0 to 1.0)
uptime_seconds float Seconds since pipeline.run() was called

Error Handling

PipelineCast distinguishes between transient and permanent errors. By default, transient errors (network timeouts, temporary unavailability) trigger automatic retry with exponential backoff. Permanent errors (schema violations, malformed data) route the record to a dead-letter queue.

from pipelinecast import Pipeline
from pipelinecast.errors import RetryPolicy

pipeline = Pipeline("resilient")
pipeline.error_handler(
    retry=RetryPolicy(max_attempts=5, backoff_base=2.0),
    dead_letter="s3://my-bucket/dlq/",
)