セキュリティ設定(オプション)
手順
-
Kafka_Avro_Beans内にあるBean KafkaAvroSerializerBeanを以下の内容で変更します:
package org.example.local_project.beansjar.kafka_avro_beans; import java.util.HashMap; import java.util.Map; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; public class KafkaAvroSerializerBean extends KafkaAvroSerializer { @Override public void configure(KafkaAvroSerializerConfig config) { Map<String, String> configs = new HashMap<String, String>(); configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, config.getString("basic.auth.credentials.source")); configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, config.getPassword("basic.auth.user.info").value()); this.schemaRegistry = new CachedSchemaRegistryClient( config.getSchemaRegistryUrls(), 1000, configs); this.strategyUsesSchema(true); this.autoRegisterSchema = true; // this.useSchemaReflection = true; // this.normalizeSchema = true; // this.useLatestVersion = true; // this.avroReflectionAllowNull = true; } }
-
Kafka_Avro_Beans内にあるBean KafkaAvroDeserializerBeanを以下の内容で変更します:
package org.example.local_project.beansjar.kafka_avro_beans; import java.util.HashMap; import java.util.Map; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; public class KafkaAvroDeserializerBean extends KafkaAvroDeserializer { @Override public void configure(KafkaAvroDeserializerConfig config) { Map<String, String> configs = new HashMap<String, String>(); configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, config.getString("basic.auth.credentials.source")); configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, config.getPassword("basic.auth.user.info").value()); this.schemaRegistry = new CachedSchemaRegistryClient( config.getSchemaRegistryUrls(), 1000, configs); this.useSpecificAvroReader = false; }}
-
Confluentによって発行されるAPIキーとAPIシークレットの値を使い、ルートのコンテキスト変数としてString型のAPI_KEYとPassword型のAPI_SECRETを作成します。
コンテキストパラメーターの詳細は、コンテキストと変数を使用をご覧ください。APIキーとAPIシークレットの詳細は、Confluentのドキュメンテーションをご覧ください。
- Confluentによって発行されるSCHEMA REGISTRY KEYとSCHEMA REGISTRY SECRETの値を使い、String型のSCHEMA_REGISTRY_KEYコンテキストパラメーターとPassword型のSCHEMA_REGISTRY_SECRETコンテキストパラメーターを作成します。
-
cKafkaのコンポーネントであるSendMessageToKafkaとReceiveMessageFromKafkaの[Advanced settings] (詳細設定)タブで、[Kafka Properties] (Kafkaのプロパティ)フィールドに次のパラメーターを追加します。
詳細は、Confluent Kafkaのセキュリティに関するドキュメンテーション(https://docs.confluent.io/platform/current/security/auth-overview.html)をご覧ください。名前 値 "additionalProperties.basic.auth.user.info"
"RAW(" + context.SCHEMA_REGISTRY_KEY + ":" + context.SCHEMA_REGISTRY_SECRET+")"
"additionalProperties.basic.auth.credentials.source"
"USER_INFO"
"additionalProperties.security.protocol"
"SASL_SSL"
"additionalProperties.sasl.jaas.config"
"RAW(org.apache.kafka.common.security.plain.PlainLoginModule required username='"+ context.API_KEY +"' password='" + context.API_SECRET + "';)"
"additionalProperties.sasl.mechanism"
"PLAIN"