Benthos is a high performance and resilient stream processor, able to connect various sources and sinks in a range of brokering patterns and perform hydration, enrichments, transformations and filters on payloads.
It comes with a powerful mapping language, is easy to deploy and monitor, and ready to drop into your pipeline either as a static binary, docker image, or serverless function, making it cloud native as heck.
Benthos is fully declarative, with stream pipelines defined in a single config file, allowing you to specify connectors and a list of processing stages:
input: gcp_pubsub: project: foo subscription: bar pipeline: processors: - bloblang: | root.message = this root.meta.link_count = this.links.length() root.user.age = this.user.age.number() output: redis_streams: url: tcp://TODO:6379 stream: baz max_in_flight: 20
Yep, we got 'em. Benthos implements transaction based resiliency with back pressure. When connecting to at-least-once sources and sinks it guarantees at-least-once delivery without needing to persist messages during transit.
Supported Sources & Sinks
AWS (DynamoDB, Kinesis, S3, SQS, SNS), Azure (Blob storage, Queue storage, Table storage), Cassandra, Elasticsearch, File, GCP (pub/sub), HDFS, HTTP (server and client, including websockets), Kafka, Memcached, MQTT, Nanomsg, NATS, NATS Streaming, NSQ, AMQP 0.91 (RabbitMQ), AMQP 1, Redis (streams, list, pubsub, hashes), SQL (MySQL, PostgreSQL, Clickhouse), Stdin/Stdout, TCP & UDP, sockets and ZMQ4.
Connectors are being added constantly, if something you want is missing then open an issue.
Grab a binary for your OS from here. Or use this script:
curl -Lsf https://sh.benthos.dev | bash
Or pull the docker image:
docker pull jeffail/benthos
Benthos can also be installed via Homebrew:
brew install benthos
For more information check out the getting started guide.
benthos -c ./config.yaml
Or, with docker:
# Send HTTP /POST data to Kafka: docker run --rm \ -e "INPUT_TYPE=http_server" \ -e "OUTPUT_TYPE=kafka" \ -e "OUTPUT_KAFKA_ADDRESSES=kafka-server:9092" \ -e "OUTPUT_KAFKA_TOPIC=benthos_topic" \ -p 4195:4195 \ jeffail/benthos # Using your own config file: docker run --rm -v /path/to/your/config.yaml:/benthos.yaml jeffail/benthos
Benthos serves two HTTP endpoints for health checks:
/pingcan be used as a liveness probe as it always returns a 200.
/readycan be used as a readiness probe as it serves a 200 only when both the input and output are connected, otherwise a 503 is returned.
Benthos provides lots of tools for making configuration discovery, debugging and organisation easy. You can read about them here.
It is possible to select fields inside a configuration file to be set via environment variables. The docker image, for example, is built with a config file where all common fields can be set this way.
Build with Go (1.15 or later):
git clone [email protected]:Jeffail/benthos cd benthos make
Benthos uses golangci-lintfor linting, which you can install with:
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.35.2
And then run it with
It's pretty easy to write your own custom plugins for Benthos, take a look at this repo for examples and build instructions.
There's a multi-stage
Dockerfile for creating a Benthos docker image which results in a minimal image from scratch. You can build it with:
Then use the image:
docker run --rm \ -v /path/to/your/benthos.yaml:/config.yaml \ -v /tmp/data:/data \ -p 4195:4195 \ benthos -c /config.yaml
There are a few examples here that show you some ways of setting up Benthos containers using
Benthos supports ZMQ4 for both data input and output. To add this you need to install libzmq4 and use the compile time flag when building Benthos:
Or to build a docker image using CGO, which includes ZMQ: