Connectors

PipelineCast ships with 40+ connectors for common data systems. Install only the extras you need: pip install pipelinecast[kafka,s3]. Each connector is available as both a Source and a Sink unless noted otherwise.

Message Queues

ConnectorExtraSourceSinkNotes
Apache Kafka[kafka]YesYesConsumer groups, exactly-once via transactions
Confluent Cloud[kafka]YesYesSASL/OAUTHBEARER auth, Schema Registry
RabbitMQ[rabbitmq]YesYesAMQP 0.9.1, publisher confirms
Redis Streams[redis]YesYesConsumer groups, XREADGROUP
Redis Pub/Sub[redis]YesYesPattern subscriptions, no persistence
Amazon SQS[aws]YesYesFIFO and standard queues, long polling
Google Pub/Sub[gcp]YesYesOrdering keys, dead-letter topics
Azure Service Bus[azure]YesYesSessions, scheduled messages
NATS JetStream[nats]YesYesDurable consumers, key-value store
Apache Pulsar[pulsar]YesYesMulti-tenancy, tiered storage

Databases

ConnectorExtraSourceSinkNotes
PostgreSQL CDC[postgres]YesLogical replication, pgoutput plugin
PostgreSQL[postgres]YesYesBatch COPY, upsert, connection pool
MySQL Binlog[mysql]YesGTID-based replication, row events
MySQL[mysql]YesYesBatch INSERT, LOAD DATA INFILE
MongoDB Change Streams[mongodb]YesResume tokens, full document lookup
MongoDB[mongodb]YesYesBulk writes, ordered/unordered
ClickHouse[clickhouse]YesYesNative protocol, async inserts
DuckDB[duckdb]YesYesIn-process OLAP, Parquet integration
SQLite[sqlite]YesYesWAL mode, no extra dependency
Elasticsearch[elastic]YesYesScroll API source, bulk indexing sink

Cloud Storage

ConnectorExtraSourceSinkNotes
Amazon S3[aws]YesYesSQS notifications, multipart upload
Google Cloud Storage[gcp]YesYesPub/Sub notifications, resumable uploads
Azure Blob Storage[azure]YesYesEvent Grid triggers, append blobs
MinIO / S3-compatible[aws]YesYesCustom endpoint_url
Local FilesystemYesYesGlob patterns, file watching (inotify)
SFTP[sftp]YesYesKey auth, directory polling
HDFS[hdfs]YesYesWebHDFS REST API

Formats (Transforms)

FormatExtraParseSerializeNotes
JSON / JSON LinesYesYesBuilt-in, orjson acceleration optional
Apache Avro[avro]YesYesSchema Registry support
Apache Parquet[parquet]YesYesColumn pruning, predicate pushdown
CSV / TSVYesYesBuilt-in, configurable delimiter/quoting
Protocol Buffers[protobuf]YesYesCompiled descriptor files
MessagePack[msgpack]YesYesBinary, compact encoding
Apache Arrow IPC[arrow]YesYesZero-copy, streaming batches

HTTP

ConnectorExtraSourceSinkNotes
Webhook ReceiverYesBuilt-in HTTP server, signature verification
REST PollingYesConfigurable interval, cursor pagination
HTTP POSTYesBatched POST, retry with backoff
Server-Sent EventsYesAuto-reconnect, last-event-id
WebSocket[websocket]YesYeswss://, ping/pong keepalive
GraphQL Subscriptions[websocket]Yesgraphql-ws protocol

Examples

Kafka to PostgreSQL

from pipelinecast import Pipeline, stages

pipeline = Pipeline("kafka-to-pg")

pipeline.source(
    stages.KafkaSource(
        brokers="kafka:9092",
        topic="orders",
        group="pg-sink-v1",
    )
).transform(
    stages.AvroDeserialize(registry="http://schema-registry:8081"),
    stages.Filter(lambda r: r["total"] > 0),
).sink(
    stages.PostgresSink(
        dsn="postgresql://app:secret@db/warehouse",
        table="orders",
        batch_size=500,
    )
)

pipeline.run(workers=4)

S3 Parquet to ClickHouse

from pipelinecast import Pipeline, stages

pipeline = Pipeline("s3-to-clickhouse")

pipeline.source(
    stages.S3Source(
        bucket="data-lake",
        prefix="events/2026/01/",
        format="parquet",
        region="us-east-1",
    )
).transform(
    stages.Select("event_id", "user_id", "action", "ts"),
    stages.Cast({"ts": "datetime"}),
).sink(
    stages.ClickHouseSink(
        host="clickhouse:9000",
        table="events_raw",
        batch_size=10000,
    )
)

pipeline.run()

Webhook to Redis Streams

from pipelinecast import Pipeline, stages

pipeline = Pipeline("webhooks")

pipeline.source(
    stages.WebhookSource(
        host="0.0.0.0",
        port=8080,
        path="/ingest",
        verify_signature="sha256",
        secret_env="WEBHOOK_SECRET",
    )
).transform(
    stages.JsonParse(),
    stages.AddField("received_at", lambda r: __import__("time").time()),
).sink(
    stages.RedisSink(
        url="redis://localhost:6379",
        stream="incoming-webhooks",
        maxlen=100000,
    )
)

pipeline.run()