Security configuration (optional)
Procedure
- 
            Modify the Bean KafkaAvroSerializerBean inside
                  Kafka_Avro_Beans with the following content:
            
package org.example.local_project.beansjar.kafka_avro_beans; import java.util.HashMap; import java.util.Map; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; public class KafkaAvroSerializerBean extends KafkaAvroSerializer { @Override public void configure(KafkaAvroSerializerConfig config) { Map<String, String> configs = new HashMap<String, String>(); configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, config.getString("basic.auth.credentials.source")); configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, config.getPassword("basic.auth.user.info").value()); this.schemaRegistry = new CachedSchemaRegistryClient( config.getSchemaRegistryUrls(), 1000, configs); this.strategyUsesSchema(true); this.autoRegisterSchema = true; // this.useSchemaReflection = true; // this.normalizeSchema = true; // this.useLatestVersion = true; // this.avroReflectionAllowNull = true; } } - 
            Modity the Bean KafkaAvroDeserializerBean inside
                  Kafka_Avro_Beans with the following content:
            
package org.example.local_project.beansjar.kafka_avro_beans; import java.util.HashMap; import java.util.Map; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; public class KafkaAvroDeserializerBean extends KafkaAvroDeserializer { @Override public void configure(KafkaAvroDeserializerConfig config) { Map<String, String> configs = new HashMap<String, String>(); configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, config.getString("basic.auth.credentials.source")); configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, config.getPassword("basic.auth.user.info").value()); this.schemaRegistry = new CachedSchemaRegistryClient( config.getSchemaRegistryUrls(), 1000, configs); this.useSpecificAvroReader = false; }} - 
            Create context variables for the Route, API_KEY of  String type
               and API_SECRET of Password type with values of API KEY and API
               SECRET issued by Confluent.
            For more information on context parameters, see Using contexts and variables.For more information about API KEY and API SECRET, see Confluent documentation.
 - Create context parameters SCHEMA_REGISTRY_KEY of String type and SCHEMA_REGISTRY_SECRET of Password type with values of SCHEMA REGISTRY KEY and SCHEMA REGISTRY SECRET issued by Confluent.
 - 
            In the Advanced settings tab of the
                  cKafka components SendMessageToKafka
               and ReceiveMessageFromKafka, add the following parameters in
               the Kafka Properties field.
            For more information on the security configuration, see the Confluent Kafka security documentation https://docs.confluent.io/platform/current/security/auth-overview.html.
Name Value "additionalProperties.basic.auth.user.info"
"RAW(" + context.SCHEMA_REGISTRY_KEY + ":" + context.SCHEMA_REGISTRY_SECRET+")"
"additionalProperties.basic.auth.credentials.source"
"USER_INFO"
"additionalProperties.security.protocol"
"SASL_SSL"
"additionalProperties.sasl.jaas.config"
"RAW(org.apache.kafka.common.security.plain.PlainLoginModule required username='"+ context.API_KEY +"' password='" + context.API_SECRET + "';)"
"additionalProperties.sasl.mechanism"
"PLAIN"
 
Did this page help you?
If you find any issues with this page or its content – a typo, a missing step, or a technical error – please let us know!