Configuration

PipelineCast supports configuration via YAML files, environment variables, or programmatic API. YAML is recommended for production deployments since it allows version-controlled, reproducible pipeline definitions.

Configuration File

By default, PipelineCast looks for pipelinecast.yaml in the current directory. Specify a custom path with the --config flag or the PIPELINECAST_CONFIG environment variable.

# pipelinecast.yaml

pipeline:
  name: "click-stream-etl"
  workers: 4

source:
  type: "kafka"
  brokers: "kafka-1:9092,kafka-2:9092"
  topic: "raw-clicks"
  group: "etl-v2"
  offset_reset: "earliest"

transforms:
  - type: "json_parse"
  - type: "filter"
    condition: "record['event'] == 'click'"
  - type: "enrich"
    geo_lookup: true
    user_agent_parse: true

sink:
  type: "postgres"
  dsn: "postgresql://etl:${DB_PASSWORD}@db:5432/analytics"
  table: "click_events"
  batch_size: 1000
  on_conflict: "ignore"

buffer:
  capacity: 8192
  policy: "block"

logging:
  level: "info"
  format: "json"

Environment Variables

Any configuration value can be overridden with an environment variable. The naming convention is PIPELINECAST_ followed by the uppercased path with underscores as delimiters.

Variable Config Path Example
PIPELINECAST_PIPELINE_NAME pipeline.name "click-stream-etl"
PIPELINECAST_PIPELINE_WORKERS pipeline.workers 8
PIPELINECAST_SOURCE_BROKERS source.brokers "kafka:9092"
PIPELINECAST_BUFFER_CAPACITY buffer.capacity 16384
PIPELINECAST_LOG_LEVEL logging.level "debug"

You can also reference environment variables inside the YAML file using ${VAR_NAME} syntax. Undefined variables raise a startup error unless a default is provided: ${VAR_NAME:-default}.

Logging

PipelineCast uses Python's standard logging module. The framework emits structured logs under the pipelinecast logger namespace.

Parameter Type Default Description
logging.level string "info" Minimum log level (debug, info, warning, error)
logging.format string "text" Output format: text or json
logging.file string "" Optional file path; empty means stderr only
logging.include_metrics bool false Emit periodic throughput metrics to the log

Example JSON log output:

{"ts":"2026-01-15T08:32:11Z","level":"info","logger":"pipelinecast.pipeline","msg":"pipeline started","pipeline":"click-stream-etl","workers":4}
{"ts":"2026-01-15T08:32:12Z","level":"info","logger":"pipelinecast.source.kafka","msg":"consumer connected","brokers":"kafka-1:9092","topic":"raw-clicks","group":"etl-v2"}

Buffer and Backpressure

Each inter-stage queue is a bounded buffer. When a buffer fills up, the upstream stage blocks until space becomes available. This is the core backpressure mechanism that prevents memory exhaustion.

Parameter Type Default Description
buffer.capacity int 4096 Maximum records in each inter-stage buffer
buffer.policy string "block" Behavior when full: block, drop_oldest, or drop_newest
buffer.batch_flush_ms int 100 Max time before a partial batch is flushed downstream
buffer.high_watermark float 0.8 Buffer fill ratio that triggers a backpressure warning

Programmatic override per stage:

from pipelinecast import Pipeline, BufferPolicy

pipeline = Pipeline("heavy-load")
pipeline.source(
    source,
    buffer_capacity=16384,
    buffer_policy=BufferPolicy.DROP_OLDEST,
).transform(
    transform,
).sink(sink)

Metrics Endpoint

Enable the built-in Prometheus-compatible metrics server to monitor pipeline health:

metrics:
  enabled: true
  host: "0.0.0.0"
  port: 9090
  path: "/metrics"

Exposed metrics include pipelinecast_records_processed_total, pipelinecast_buffer_usage_ratio, pipelinecast_stage_latency_seconds, and pipelinecast_errors_total.