Security configuration (optional)
Procedure
-
Modify the Bean KafkaAvroSerializerBean inside
Kafka_Avro_Beans with the following content:
package org.example.local_project.beansjar.kafka_avro_beans; import java.util.HashMap; import java.util.Map; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; public class KafkaAvroSerializerBean extends KafkaAvroSerializer { @Override public void configure(KafkaAvroSerializerConfig config) { Map<String, String> configs = new HashMap<String, String>(); configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, config.getString("basic.auth.credentials.source")); configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, config.getPassword("basic.auth.user.info").value()); this.schemaRegistry = new CachedSchemaRegistryClient( config.getSchemaRegistryUrls(), 1000, configs); this.strategyUsesSchema(true); this.autoRegisterSchema = true; // this.useSchemaReflection = true; // this.normalizeSchema = true; // this.useLatestVersion = true; // this.avroReflectionAllowNull = true; } }
-
Modity the Bean KafkaAvroDeserializerBean inside
Kafka_Avro_Beans with the following content:
package org.example.local_project.beansjar.kafka_avro_beans; import java.util.HashMap; import java.util.Map; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; public class KafkaAvroDeserializerBean extends KafkaAvroDeserializer { @Override public void configure(KafkaAvroDeserializerConfig config) { Map<String, String> configs = new HashMap<String, String>(); configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, config.getString("basic.auth.credentials.source")); configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, config.getPassword("basic.auth.user.info").value()); this.schemaRegistry = new CachedSchemaRegistryClient( config.getSchemaRegistryUrls(), 1000, configs); this.useSpecificAvroReader = false; }}
-
Create context variables for the Route, API_KEY of String type
and API_SECRET of Password type with values of API KEY and API
SECRET issued by Confluent.
For more information on context parameters, see Using contexts and variables.For more information about API KEY and API SECRET, see Confluent documentation.
- Create context parameters SCHEMA_REGISTRY_KEY of String type and SCHEMA_REGISTRY_SECRET of Password type with values of SCHEMA REGISTRY KEY and SCHEMA REGISTRY SECRET issued by Confluent.
-
In the Advanced settings tab of the
cKafka components SendMessageToKafka
and ReceiveMessageFromKafka, add the following parameters in
the Kafka Properties field.
For more information on the security configuration, see the Confluent Kafka security documentation https://docs.confluent.io/platform/current/security/auth-overview.html.Name Value "additionalProperties.basic.auth.user.info"
"RAW(" + context.SCHEMA_REGISTRY_KEY + ":" + context.SCHEMA_REGISTRY_SECRET+")"
"additionalProperties.basic.auth.credentials.source"
"USER_INFO"
"additionalProperties.security.protocol"
"SASL_SSL"
"additionalProperties.sasl.jaas.config"
"RAW(org.apache.kafka.common.security.plain.PlainLoginModule required username='"+ context.API_KEY +"' password='" + context.API_SECRET + "';)"
"additionalProperties.sasl.mechanism"
"PLAIN"
Did this page help you?
If you find any issues with this page or its content – a typo, a missing step, or a technical error – please let us know!