Kafka Avro consumers API
Qlik Replicate supports writing messages to Kafka both in JSON format and in Apache Avro format. While JSON format is very easy to read, it is also rather verbose and slow to process. Apache Avro format, on the other hand, is extremely compact and can be processed very fast. However, using Avro for streaming presents a challenge in that Avro messages cannot be interpreted without the schema with which they were created. Moreover, Avro messages are usually encapsulated in a special "envelope" that points the processing agent to the correct schema for interpreting the message.
The advantages of Avro format over other plain formats such as JSON can be summed up as follows:
- More efficient in serialization space consumption
- Relatively easy to consume programatically
- Supports long term schema evolution, which is important with data schemas that are likely to change over time
To ease the task of processing Avro messages, Qlik provides the Attunity Avro Message Decoder SDK, which contains the following:
- Avro message deserializer
- Message schema integration hooks
- Working samples
All Replicate message types covered in this section are encapsulated in a single message schema called the Replicate envelope. The schema of the Replicate envelope is as following:
{
"type":"record",
"name":"MessageEnvelope",
"fields":[
{"name":"magic","type":{"type":"fixed","name":"Magic","size":5}},
{"name":"type","type":"string"},
{"name":"headers","type":["null",{"type":"map","values":"string"}]},
{"name":"messageSchemaId","type":["null","string"]},
{"name":"messageSchema","type":["null","string"]},
{"name":"message","type":"bytes"}
]
}
The fields in the envelope are as follows:
-
magic (5 bytes fixed field)
The constant "atMSG" is used to identify this form of message. The "atMSG" constant should be used to validate that this message is indeed an Replicate envelope message.
-
type (string field)
Describes the enveloped message type. This can be one of two values: MD which stands for metadata message and DT which stands for data message.
-
headers (map of string key and value)
A free for use map for various properties set at the application level. Currently, no headers are set by Qlik Replicate but this may change in future versions.
-
messageSchemaId (null or string)
A reference to a schema defined elsewhere, which can be used to deserialize the bytes in the message field. This specification does not explain how the schema ID is used for looking up the actual schema - it is an application level detail. This field is used exclusively with the messageSchema field.
-
messageSchema (null or string)
An embedded UTF-8 encoded Avro JSON schema with which the message field can be serialized. This field is used exclusively with the messageSchemaId field.
-
message (bytes)
An Avro encoded message, which is the payload of the message envelope.
Given the envelope schema, it is possible for anyone using this schema to properly decode the envelope messages from Kafka.
Once the envelope message has been decoded, there are two possible scenarios:
- Scenario 1: Decoding a self-describing message such as the metadata message
- Scenario 2: Decoding a message by referenced schema ID such as data messages
The method for logically decoding messages in both scenarios is described below.
Decoding a self-describing message
When the messageSchema field is not null, it means the message field can be decoded using the schema included in the messageSchema field. This is fairly straightforward to perform programatically since the only thing you need to usually supply Avro is a schema and a message, both of which are provided in the envelope message.
The Replicate metadata messages which include both table metadata, lineage and data schema description (to be referenced later by data messages) are enveloped in the self-describing envelope.
Decoding a message by referenced schema ID
Avro schemas are JSON documents which can be quite large, usually much larger than the data encoded by Avro conforming to the schema. For example, a schema of a 10 column table could be a JSON document of more than 100 characters while an actual row encoding of 10 columns may be only 10 bytes (depending of course on the type and length of fields). It is therefore typically not recommended to include schema and data together in a Kafka message because the schema information is redundant and is the same for all data messages while the actual data is the only thing which differs between data messages.
To avoid sending schema with each data message, each schema has a 32 bytes long ID. When a data message based on a previously sent data message schema (via the metadata message) is constructed, the messageSchema field is set to null and the messageSchemaId field is set to the 32 bytes ID of the schema instead. The application responsibility is to locate the data schema sent earlier in the metadata message and use that schema to decode the data message contained in the message field.
Typical consumer logic
A typical scenario involving Kafka involves Qlik Replicate as the Producer of messages into Kafka and customer code as the Consumer. Qlik Replicate offers the ability to define a specific topic as the schema topic and different topics for the table data.
The customer's consumer code should read metadata messages from the schema topic and then save the data schemas and any other information the consumer wishes to access later in a customer defined zone. Another set of customer consumers should read data messages from the various data topics, and access the data schemas zone as required to retrieve the data schemas required for decoding the data messages.
When consuming data messages and metadata messages from several topics and partitions in a multi-thread/process manner, a situation may arise where a given consumer may attempt to read a data message before the corresponding metadata message has been read. As it is not possible to read a data message before its corresponding metadata message, the consumer's logic should wait a reasonable amount of time until the corresponding metadata message has been read. If the metadata message is still not available after waiting for a reasonable amount of time, the consumer should handle this as an unexpected error and activate the planned error policy. An example of such a policy could be saving the message in a dedicated “delayed” topic for later processing.
As a rule of thumb, the number of metadata messages will be much lower (in the magnitude of 1:10000 or more) than the number of data messages. So, assuming a metadata consumer is active, the gap between metadata message and data message should be no more than a few seconds (usually, milliseconds).