Connectors
PipelineCast ships with 40+ connectors for common data systems. Install only the extras you need: pip install pipelinecast[kafka,s3]. Each connector is available as both a Source and a Sink unless noted otherwise.
Message Queues
| Connector | Extra | Source | Sink | Notes |
| Apache Kafka | [kafka] | Yes | Yes | Consumer groups, exactly-once via transactions |
| Confluent Cloud | [kafka] | Yes | Yes | SASL/OAUTHBEARER auth, Schema Registry |
| RabbitMQ | [rabbitmq] | Yes | Yes | AMQP 0.9.1, publisher confirms |
| Redis Streams | [redis] | Yes | Yes | Consumer groups, XREADGROUP |
| Redis Pub/Sub | [redis] | Yes | Yes | Pattern subscriptions, no persistence |
| Amazon SQS | [aws] | Yes | Yes | FIFO and standard queues, long polling |
| Google Pub/Sub | [gcp] | Yes | Yes | Ordering keys, dead-letter topics |
| Azure Service Bus | [azure] | Yes | Yes | Sessions, scheduled messages |
| NATS JetStream | [nats] | Yes | Yes | Durable consumers, key-value store |
| Apache Pulsar | [pulsar] | Yes | Yes | Multi-tenancy, tiered storage |
Databases
| Connector | Extra | Source | Sink | Notes |
| PostgreSQL CDC | [postgres] | Yes | — | Logical replication, pgoutput plugin |
| PostgreSQL | [postgres] | Yes | Yes | Batch COPY, upsert, connection pool |
| MySQL Binlog | [mysql] | Yes | — | GTID-based replication, row events |
| MySQL | [mysql] | Yes | Yes | Batch INSERT, LOAD DATA INFILE |
| MongoDB Change Streams | [mongodb] | Yes | — | Resume tokens, full document lookup |
| MongoDB | [mongodb] | Yes | Yes | Bulk writes, ordered/unordered |
| ClickHouse | [clickhouse] | Yes | Yes | Native protocol, async inserts |
| DuckDB | [duckdb] | Yes | Yes | In-process OLAP, Parquet integration |
| SQLite | [sqlite] | Yes | Yes | WAL mode, no extra dependency |
| Elasticsearch | [elastic] | Yes | Yes | Scroll API source, bulk indexing sink |
Cloud Storage
| Connector | Extra | Source | Sink | Notes |
| Amazon S3 | [aws] | Yes | Yes | SQS notifications, multipart upload |
| Google Cloud Storage | [gcp] | Yes | Yes | Pub/Sub notifications, resumable uploads |
| Azure Blob Storage | [azure] | Yes | Yes | Event Grid triggers, append blobs |
| MinIO / S3-compatible | [aws] | Yes | Yes | Custom endpoint_url |
| Local Filesystem | — | Yes | Yes | Glob patterns, file watching (inotify) |
| SFTP | [sftp] | Yes | Yes | Key auth, directory polling |
| HDFS | [hdfs] | Yes | Yes | WebHDFS REST API |
Formats (Transforms)
| Format | Extra | Parse | Serialize | Notes |
| JSON / JSON Lines | — | Yes | Yes | Built-in, orjson acceleration optional |
| Apache Avro | [avro] | Yes | Yes | Schema Registry support |
| Apache Parquet | [parquet] | Yes | Yes | Column pruning, predicate pushdown |
| CSV / TSV | — | Yes | Yes | Built-in, configurable delimiter/quoting |
| Protocol Buffers | [protobuf] | Yes | Yes | Compiled descriptor files |
| MessagePack | [msgpack] | Yes | Yes | Binary, compact encoding |
| Apache Arrow IPC | [arrow] | Yes | Yes | Zero-copy, streaming batches |
HTTP
| Connector | Extra | Source | Sink | Notes |
| Webhook Receiver | — | Yes | — | Built-in HTTP server, signature verification |
| REST Polling | — | Yes | — | Configurable interval, cursor pagination |
| HTTP POST | — | — | Yes | Batched POST, retry with backoff |
| Server-Sent Events | — | Yes | — | Auto-reconnect, last-event-id |
| WebSocket | [websocket] | Yes | Yes | wss://, ping/pong keepalive |
| GraphQL Subscriptions | [websocket] | Yes | — | graphql-ws protocol |
Examples
Kafka to PostgreSQL
from pipelinecast import Pipeline, stages
pipeline = Pipeline("kafka-to-pg")
pipeline.source(
stages.KafkaSource(
brokers="kafka:9092",
topic="orders",
group="pg-sink-v1",
)
).transform(
stages.AvroDeserialize(registry="http://schema-registry:8081"),
stages.Filter(lambda r: r["total"] > 0),
).sink(
stages.PostgresSink(
dsn="postgresql://app:secret@db/warehouse",
table="orders",
batch_size=500,
)
)
pipeline.run(workers=4)
S3 Parquet to ClickHouse
from pipelinecast import Pipeline, stages
pipeline = Pipeline("s3-to-clickhouse")
pipeline.source(
stages.S3Source(
bucket="data-lake",
prefix="events/2026/01/",
format="parquet",
region="us-east-1",
)
).transform(
stages.Select("event_id", "user_id", "action", "ts"),
stages.Cast({"ts": "datetime"}),
).sink(
stages.ClickHouseSink(
host="clickhouse:9000",
table="events_raw",
batch_size=10000,
)
)
pipeline.run()
Webhook to Redis Streams
from pipelinecast import Pipeline, stages
pipeline = Pipeline("webhooks")
pipeline.source(
stages.WebhookSource(
host="0.0.0.0",
port=8080,
path="/ingest",
verify_signature="sha256",
secret_env="WEBHOOK_SECRET",
)
).transform(
stages.JsonParse(),
stages.AddField("received_at", lambda r: __import__("time").time()),
).sink(
stages.RedisSink(
url="redis://localhost:6379",
stream="incoming-webhooks",
maxlen=100000,
)
)
pipeline.run()