Configuration
PipelineCast supports configuration via YAML files, environment variables, or programmatic API. YAML is recommended for production deployments since it allows version-controlled, reproducible pipeline definitions.
Configuration File
By default, PipelineCast looks for pipelinecast.yaml in the current directory. Specify a custom path with the --config flag or the PIPELINECAST_CONFIG environment variable.
# pipelinecast.yaml
pipeline:
name: "click-stream-etl"
workers: 4
source:
type: "kafka"
brokers: "kafka-1:9092,kafka-2:9092"
topic: "raw-clicks"
group: "etl-v2"
offset_reset: "earliest"
transforms:
- type: "json_parse"
- type: "filter"
condition: "record['event'] == 'click'"
- type: "enrich"
geo_lookup: true
user_agent_parse: true
sink:
type: "postgres"
dsn: "postgresql://etl:${DB_PASSWORD}@db:5432/analytics"
table: "click_events"
batch_size: 1000
on_conflict: "ignore"
buffer:
capacity: 8192
policy: "block"
logging:
level: "info"
format: "json"
Environment Variables
Any configuration value can be overridden with an environment variable. The naming convention is PIPELINECAST_ followed by the uppercased path with underscores as delimiters.
| Variable | Config Path | Example |
|---|---|---|
PIPELINECAST_PIPELINE_NAME |
pipeline.name |
"click-stream-etl" |
PIPELINECAST_PIPELINE_WORKERS |
pipeline.workers |
8 |
PIPELINECAST_SOURCE_BROKERS |
source.brokers |
"kafka:9092" |
PIPELINECAST_BUFFER_CAPACITY |
buffer.capacity |
16384 |
PIPELINECAST_LOG_LEVEL |
logging.level |
"debug" |
You can also reference environment variables inside the YAML file using ${VAR_NAME} syntax. Undefined variables raise a startup error unless a default is provided: ${VAR_NAME:-default}.
Logging
PipelineCast uses Python's standard logging module. The framework emits structured logs under the pipelinecast logger namespace.
| Parameter | Type | Default | Description |
|---|---|---|---|
logging.level |
string |
"info" |
Minimum log level (debug, info, warning, error) |
logging.format |
string |
"text" |
Output format: text or json |
logging.file |
string |
"" |
Optional file path; empty means stderr only |
logging.include_metrics |
bool |
false |
Emit periodic throughput metrics to the log |
Example JSON log output:
{"ts":"2026-01-15T08:32:11Z","level":"info","logger":"pipelinecast.pipeline","msg":"pipeline started","pipeline":"click-stream-etl","workers":4}
{"ts":"2026-01-15T08:32:12Z","level":"info","logger":"pipelinecast.source.kafka","msg":"consumer connected","brokers":"kafka-1:9092","topic":"raw-clicks","group":"etl-v2"}
Buffer and Backpressure
Each inter-stage queue is a bounded buffer. When a buffer fills up, the upstream stage blocks until space becomes available. This is the core backpressure mechanism that prevents memory exhaustion.
| Parameter | Type | Default | Description |
|---|---|---|---|
buffer.capacity |
int |
4096 |
Maximum records in each inter-stage buffer |
buffer.policy |
string |
"block" |
Behavior when full: block, drop_oldest, or drop_newest |
buffer.batch_flush_ms |
int |
100 |
Max time before a partial batch is flushed downstream |
buffer.high_watermark |
float |
0.8 |
Buffer fill ratio that triggers a backpressure warning |
Programmatic override per stage:
from pipelinecast import Pipeline, BufferPolicy
pipeline = Pipeline("heavy-load")
pipeline.source(
source,
buffer_capacity=16384,
buffer_policy=BufferPolicy.DROP_OLDEST,
).transform(
transform,
).sink(sink)
Metrics Endpoint
Enable the built-in Prometheus-compatible metrics server to monitor pipeline health:
metrics:
enabled: true
host: "0.0.0.0"
port: 9090
path: "/metrics"
Exposed metrics include pipelinecast_records_processed_total, pipelinecast_buffer_usage_ratio, pipelinecast_stage_latency_seconds, and pipelinecast_errors_total.