API Reference
PipelineCast is built around a small set of composable types: Pipeline, Source, Transform, and Sink. All stage types implement the Stage protocol.
Pipeline
class Pipeline(name: str, *, workers: int = 1, config: dict | None = None)
The top-level object that wires together a Source, a chain of Transforms, and a Sink. A pipeline manages the lifecycle of all its stages and handles graceful shutdown on SIGINT/SIGTERM.
| Method | Returns | Description |
|---|---|---|
pipeline.source(src, **opts) |
Pipeline |
Set the pipeline source. Accepts buffer overrides as keyword arguments. |
pipeline.transform(*stages) |
Pipeline |
Append one or more transforms to the processing chain. |
pipeline.sink(snk, **opts) |
Pipeline |
Set the pipeline sink. Returns the pipeline for final configuration. |
pipeline.run(*, workers=None) |
None |
Start the pipeline. Blocks until stopped or an unrecoverable error occurs. |
pipeline.stop(timeout=30.0) |
None |
Gracefully stop all stages. Flushes pending records before shutdown. |
pipeline.metrics() |
PipelineMetrics |
Return current throughput, buffer usage, and error counts per stage. |
Stage Protocol
All stages implement the Stage protocol. You rarely need to implement this directly; use the specialized base classes instead.
from typing import Protocol, runtime_checkable
@runtime_checkable
class Stage(Protocol):
def open(self) -> None:
"""Called once when the pipeline starts."""
...
def close(self) -> None:
"""Called once during graceful shutdown."""
...
def health(self) -> bool:
"""Return True if the stage is operational."""
...
Source
A Source produces records. It must implement poll() which returns an iterable of records or None when exhausted.
from pipelinecast import Source
class MySource(Source):
def open(self):
self.conn = create_connection()
def poll(self, batch_size: int = 100) -> list[dict] | None:
# Return a list of records, or None to signal end-of-stream
rows = self.conn.fetch(batch_size)
return rows if rows else None
def close(self):
self.conn.close()
Transform
A Transform receives a record and returns a modified record, None to drop it, or a list of records for fan-out.
from pipelinecast import Transform
class SplitLines(Transform):
def process(self, record: dict) -> list[dict]:
# Fan-out: one record per line
return [
{**record, "line": line}
for line in record["body"].splitlines()
]
| Return Value | Behavior |
|---|---|
dict |
Forward one record downstream |
None |
Drop the record (filtered out) |
list[dict] |
Fan-out: forward multiple records downstream |
Sink
A Sink consumes records. It receives batches for efficiency and must implement write().
from pipelinecast import Sink
class HttpSink(Sink):
def __init__(self, url: str, batch_size: int = 50):
self.url = url
self.batch_size = batch_size
def write(self, records: list[dict]) -> None:
import httpx
httpx.post(self.url, json=records, timeout=10)
PipelineMetrics
The metrics() method returns a dataclass with current pipeline statistics:
| Field | Type | Description |
|---|---|---|
records_in |
int |
Total records read from source |
records_out |
int |
Total records written to sink |
records_dropped |
int |
Records filtered or dropped by transforms |
errors |
int |
Total processing errors across all stages |
buffer_usage |
dict[str, float] |
Buffer fill ratio per stage (0.0 to 1.0) |
uptime_seconds |
float |
Seconds since pipeline.run() was called |
Error Handling
PipelineCast distinguishes between transient and permanent errors. By default, transient errors (network timeouts, temporary unavailability) trigger automatic retry with exponential backoff. Permanent errors (schema violations, malformed data) route the record to a dead-letter queue.
from pipelinecast import Pipeline
from pipelinecast.errors import RetryPolicy
pipeline = Pipeline("resilient")
pipeline.error_handler(
retry=RetryPolicy(max_attempts=5, backoff_base=2.0),
dead_letter="s3://my-bucket/dlq/",
)