Skip to content

maechler/mqtt2kafkabridge

Repository files navigation

MQTT to Kafka Bridge

This is an extremely primitive MQTT to Kafka bridge, consisting of around 100 lines of code. It is not optimized for extensibility, performance nor stability and should not be used in a production environment. All it does is reading messages from an MQTT broker, replacing topic separators (e.g. home/outside/humidity -> home.outside.humidity) and forwarding the message to Kafka.

Configuration

Environment Variable Default value Description
CLIENT_ID Mqtt2KafkaBridge Client ID used to connect to MQTT and Kafka broker.
KAFKA_BROKER_HOST localhost:9092 Host and port of your Kafka broker.
KAFKA_TOPIC_SEPARATOR . The topic separator used for Kafka topics, it replaces MQTT_TOPIC_SEPARATOR.
MQTT_BROKER_HOST localhost:1883 Host and port of your MQTT broker.
MQTT_BROKER_USER Username used to connect to MQTT broker.
MQTT_BROKER_PASSWORD Password used to connect to MQTT broker.
MQTT_AUTOMATIC_RECONNECT true Whether or not the MQTT client should reconnect automatically after connection to broker is lost.
MQTT_TOPIC_FILTER # The MQTT topic filter that we subscribe to, by default all messages.
MQTT_TOPIC_SEPARATOR / The topic separator used for MQTT topics, it is replaced by KAFKA_TOPIC_SEPARATOR.

Build

./gradlew build

Run

java -jar build/libs/iot-home.mqtt2kafkabridge-1.0.jar

Run with custom configuration

export KAFKA_BROKER_HOST=kafka:9092;
export MQTT_BROKER_HOST=mqtt:9092;
java -jar build/libs/iot-home.mqtt2kafkabridge-1.0.jar;

Docker

dockerhub: https://hub.docker.com/r/marmaechler/mqtt2kafkabridge

Docker compose example

version: "3"
services:
  mqtt2kafkabridge:
    image: marmaechler/mqtt2kafkabridge:latest
    environment:
      KAFKA_BROKER_HOST: example.com:9092
      KAFKA_TOPIC_SEPARATOR: _
      MQTT_TOPIC_FILTER: home/#
    volumes:
      - ./mqtt2kafkabridge/logs:/opt/mqtt2kafkabridge/logs

Docker compose example integration

version: "3"
services:
  hivemq:
    build: hivemq
    restart: always
    ports:
      - 1883:1883
      - 8080:8080
      - 8000:8000
  zookeeper:
    image: bitnami/zookeeper:latest
    restart: always
    environment:
      ALLOW_ANONYMOUS_LOGIN: 'yes'
    volumes:
      - ./zookeeper/persistence:/bitnami/zookeeper
  kafka:
    image: bitnami/kafka:latest
    depends_on:
      - zookeeper
    restart: always
    ports:
      - 9092:9092
      - 29092:29092
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      ALLOW_PLAINTEXT_LISTENER: 'yes'
      # see https://rmoff.net/2018/08/02/kafka-listeners-explained/
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
    volumes:
      - ./kafka/persistence:/bitnami/kafka
  mqtt2kafkabridge:
    image: marmaechler/mqtt2kafkabridge:latest
    depends_on:
      - kafka
      - hivemq
    restart: always
    environment:
      KAFKA_BROKER_HOST: kafka:9092
      MQTT_BROKER_HOST: hivemq:1883
    volumes:
      - ./mqtt2kafkabridge/logs:/opt/mqtt2kafkabridge/logs