Kafka Source Integration

Apache Kafka is a popular open-source project for building data processing pipelines. Vantiq offers direct integration with Kafka through sources, which can both consume and produce messages on Kafka topics. For more information on Kafka, take a look at the Apache Documentation.

The basic process for up a Kafka Source and reading messages as they arrive on a topic is as follows:

Kafka Source Representation

A source resource defines the integration with a set Kafka topics and contains the following properties:

The Kafka Broker Configuration Documentation describes many additional configuration values which can also be applied in a Kafka source config. Additional config properties should be defined in the config object the same way as “bootstrap.servers” (see example below). Any config values that should only apply to the consumer should be prefixed with “consumer.” (i.e: “consumer.key.deserializer”), while configs that only apply to producing messages should be prefixed with “producer.” (i.e: “producer.key.serializer”). While this is not possible through the Source UI currently, it is possible to add these config values through Find Records by manually defining the source and adding it through Data > Add Records or through the CLI.

Create a Kafka Source

The following example illustrates how to create a Kafka source using the REST API. Kafka sources can also be defined in Modelo by using the Add button to select Source… when in Modelo Development Mode.

POST https://dev.vantiq.com/api/v1/resources/sources
{ 
    "name": "myKafkaSource",
    "type": "KAFKA",
    "direction": "BOTH",
    "config": {
        "bootstrap.servers": "http://localhost:9092",
        "topics": ["topic1", "topic2", "topicX"]
    }
}

Delete a Kafka Source

The example Kafka source named myKafkaSource can be deleted using the REST API by issuing the following request:

POST https://dev.vantiq.com/api/v1/resources/sources/myKafkaSource

Produce Messages on a Kafka Topic

Messages are produced on Kafka topics in VAIL with the PUBLISH command. The PUBLISH request for Kafka sources takes a minimum of three parameters: the value (and optionally key) to send, the Kafka source, and topic to which the publish is sent. For example:

PUBLISH { value: "somevalue", key: "somekey"} TO SOURCE myKafkaSource USING { topic: "topicX" }

Note that in the above example, the key in the published object is optional. To trigger a rule whenever a messages is consumed from a kafka source, use the following rule trigger:

RULE myKafkaRule
WHEN MESSAGE ARRIVES FROM myKafkaSource AS msg
log.info("Received message: {}", [msg])

One important note is that by default, keys and values produced and consumed from kafka topics will be serialized and deserialized as strings. So even if a message is written to a kafka topic external to the Vantiq system, and the value is in JSON format, when the message is consumed by a Vantiq rule the value will be encoded as a string.