Connector

A standalone bridge application between MQTT and Kafka.

Application Details

MQTT is the preferred go to protocol for IoT devices, due to its low protocol overhead. it was not built for data integration and data processing. In these scenarios, developers prefer kafka or similar event streaming platform to achieve event driven architecture to process data in motion.

To clarify, MQTT and Kafka complement each other. So how we pass our huge amount of events from MQTT to kafka easily ? This is where connector comes to help.

Connector is a highly scalable and fault tolerant Application which continuously pulls events from MQTT brokers and publishes those events to Air-Gaped kafka clusters with minimal latency. And depending on configuration, it can achieve fault tolerance, so when the application crashes or kafka cluster is offline, the messages are stored and published as soon as the cluster comes online.

Deployment

To deploy this project run, we need

  • MOTT Instance
  • ZooKeeper Instance
  • Kafka Instance

Lets deploy the MQTT broker

    docker run --rm --net=host --name mosquitto -p 1883:1883  -p 9001:9001 eclipse-mosquitto mosquitto -c /mosquitto-no-auth.conf

ZooKeeper

    docker run --rm --net=host --name zookeeper -p 2181:2181 zookeeper

Find the zookeeper ip using docker command

    docker inspect zookeeper

Deploy the kafka cluster, update envar KAFKA_ZOOKEEPER_CONNECT as necessary

    docker run --rm --net=host --name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=172.17.0.1 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

Deploy the Connector Application after setting the required environment variables (described later)

docker run --rm --net=host --name connector \
-e RUN_MODE=PRODUCTION \
-e MQTT_BROKER_URL=127.0.0.1 \
-e MQTT_BROKER_PORT=1883 \
-e MQTT_CLIENT_ID=arafat \
-e MQTT_USER_NAME=user \
-e MQTT_USER_PASSWORD=12345678 \
-e LOG_LVL=info \
-e TOPIC_LIST=default:default-kafka,custom:custom-kafka,dev_arafat:dev_arafat \
-e WILDCARD_TOPIC_LIST="default/+/mqtt:custom-kafka,default/kc/#:general" \
-e INVOKE_LATENCY_SEC=10 \
-e INVOKE_MAX_GOROUTINE=1000 \
-e INVOKE_MIN_GOROUTINE=50 \
-e KAFKA_BROKER=localhost:9092 \
-e SASL_SSL=false \
arafatkhan/connector

Environment Variables

To run this project, you will need to add the following environment variables to your .env file

  • RUN_MODE can DEVELOP or PRODUCTION. if this variable is not set then it will default to DEVELOP and the required environment variables will be loaded from .env file.

  • MQTT_BROKER_URL URL of the MQTT Broker

  • MQTT_BROKER_PORT Specify Broker Port

  • MQTT_CLIENT_ID Specify the identity of the client

  • MQTT_USER_NAME Set Username if auth-conf is added

  • MQTT_USER_PASSWORD Set Password if auth-conf is added

  • LOG_LVL Connector Uses Zerolog as logging package, it has 7 logging levels. Levels: – panic fatal error warn info debug trace

  • TOPIC_LIST Comma separated topic mapping from MQTT to Kafka Cluster, for Example

TOPIC_LIST=default:default-kafka,custom:custom-kafka maps IoT events from default topic to default-kafka topic in kafka and so on.

  • WILDCARD_TOPIC_LIST MQTT supports wildcard topic, allowing thousands of individual topic to map into a

single wildcard. Its is possible that a topic may end into multiple wildcards, in that scenario, connector will push it only into the first mapped kafka topic and will throw waring log. Eg. WILDCARD_TOPIC_LIST="default/+/cars:cars,default/general/#:general"

  • INVOKE_LATENCY_SEC Specify after how much time DbInvoke Should be called. This will try to publish all the messages stored in the database, which was previously failed to publish.

  • INVOKE_MAX_GOROUTINE no of maximum concurrent go routines to publish stored messages

  • INVOKE_MIN_GOROUTINE number of go routines upon which DbInvoke will be called to spawn new goroutines

  • KAFKA_BROKER kafka broker with port, Eg. KAFKA_BROKER=localhost:9092

  • SASL_SSL connector supports PLAINTEXT and SAS/OAUTHBEARER protocols. If set to false it will use PLAINTEXT (default)

  • SASL_OAUTHBEARER_CLIENT_ID if SASL_SSL=true then set Oauth Client ID

  • SASL_OAUTHBEARER_CLIENT_SECRET Oauth Client Secret

  • SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI Oauth token Endpoint Providers URI

Eg. SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI=https://yourdomain.com/auth/realms/myrealm/protocol/openid-connect/token

  • DB_PERSIST_PATH If we want to persist our data on Sqlite, if this path is not found then switches to in-memory mode.

Run Locally with Clients

Clone the project

  git clone https://github.com/aiproshar/connector.git

Go to the project directory

  cd connector

Install dependencies

  go mod tidy

Start the application

  go run application.go

Note: Connector Requires

Run the kafka subscriber

Go to the subscriber directory

  cd clients/subscriber-kafka

Configure the .env files as necessary (topic list)

Install dependencies

  go mod tidy

Start the subscriber

  go run application.go

Run the mqtt publisher

Go to the publisher directory

  cd clients/publisher-mqtt

Configure the .env files as necessary (topic list, publish_interval, total msg etc)

Install dependencies

  go mod tidy

Start the subscriber

  go run application.go

Architecture

Connector uses 4 channels to pass incoming events to the publish handler,

or database insert handler after failure, database channel to restore messages failed

previously and database delete channel to delete successfully published events which were failed previously.

Let’s have a look at the whole design diagram.

Alt text

License

MIT

GitHub

View Github