Configuration de sécurité (facultative)
Procédure
-
Modifiez le Bean KafkaAvroSerializerBean dans Kafka_Avro_Beans avec le contenu suivant :
package org.example.local_project.beansjar.kafka_avro_beans; import java.util.HashMap; import java.util.Map; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; public class KafkaAvroSerializerBean extends KafkaAvroSerializer { @Override public void configure(KafkaAvroSerializerConfig config) { Map<String, String> configs = new HashMap<String, String>(); configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, config.getString("basic.auth.credentials.source")); configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, config.getPassword("basic.auth.user.info").value()); this.schemaRegistry = new CachedSchemaRegistryClient( config.getSchemaRegistryUrls(), 1000, configs); this.strategyUsesSchema(true); this.autoRegisterSchema = true; // this.useSchemaReflection = true; // this.normalizeSchema = true; // this.useLatestVersion = true; // this.avroReflectionAllowNull = true; } }
-
Modifiez le Bean KafkaAvroDeserializerBean dans Kafka_Avro_Beans avec le contenu suivant :
package org.example.local_project.beansjar.kafka_avro_beans; import java.util.HashMap; import java.util.Map; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; public class KafkaAvroDeserializerBean extends KafkaAvroDeserializer { @Override public void configure(KafkaAvroDeserializerConfig config) { Map<String, String> configs = new HashMap<String, String>(); configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, config.getString("basic.auth.credentials.source")); configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, config.getPassword("basic.auth.user.info").value()); this.schemaRegistry = new CachedSchemaRegistryClient( config.getSchemaRegistryUrls(), 1000, configs); this.useSpecificAvroReader = false; }}
-
Créez des variables de contexte pour la Route, API_KEY de type String et API_SECRET de type Password avec les valeurs de API KEY et API SECRET émises par Confluent.
Pour plus d'informations concernant les paramètres de contexte, consultez Utiliser les contextes et les variables.Pour plus d'informations concernant API KEY et API SECRET, consultez la documentation Confluent (en anglais).
- Créez les paramètres de contexte SCHEMA_REGISTRY_KEY de type String et SCHEMA_REGISTRY_SECRET de type Password, avec les valeurs de SCHEMA REGISTRY KEY et SCHEMA REGISTRY SECRET émises par Confluent.
-
Dans l'onglet Advanced settings des composants cKafka SendMessageToKafka et ReceiveMessageFromKafka, ajoutez les paramètres suivants dans le champ Kafka Properties.
Pour plus d'informations concernant la configuration de la sécurité, consultez la documentation Confluent Kafka relative à la sécurité https://docs.confluent.io/platform/current/security/auth-overview.html (en anglais).Name (Nom) Value (Valeur) "additionalProperties.basic.auth.user.info"
"RAW(" + context.SCHEMA_REGISTRY_KEY + ":" + context.SCHEMA_REGISTRY_SECRET+")"
"additionalProperties.basic.auth.credentials.source"
"USER_INFO"
"additionalProperties.security.protocol"
"SASL_SSL"
"additionalProperties.sasl.jaas.config"
"RAW(org.apache.kafka.common.security.plain.PlainLoginModule required username='"+ context.API_KEY +"' password='" + context.API_SECRET + "';)"
"additionalProperties.sasl.mechanism"
"PLAIN"
Cette page vous a-t-elle aidé ?
Si vous rencontrez des problèmes sur cette page ou dans son contenu – une faute de frappe, une étape manquante ou une erreur technique – faites-le-nous savoir.