メイン コンテンツをスキップする 補完的コンテンツへスキップ

セキュリティ設定(オプション)

手順

  1. Kafka_Avro_Beans内にあるBean KafkaAvroSerializerBeanを以下の内容で変更します:
    package org.example.local_project.beansjar.kafka_avro_beans;
    
    import java.util.HashMap;
    import java.util.Map;
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
    
    public class KafkaAvroSerializerBean extends KafkaAvroSerializer {
    
    	@Override
    	public void configure(KafkaAvroSerializerConfig config) {
    
          	Map<String, String> configs = new HashMap<String, String>();
          
    configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
       config.getString("basic.auth.credentials.source"));
    configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, 
       config.getPassword("basic.auth.user.info").value());       		
           this.schemaRegistry = new CachedSchemaRegistryClient(  
              config.getSchemaRegistryUrls(), 1000, configs);
           this.strategyUsesSchema(true);
           this.autoRegisterSchema = true;
    //	  this.useSchemaReflection = true;
    //	  this.normalizeSchema = true;
    //	  this.useLatestVersion = true; 
    //	  this.avroReflectionAllowNull = true;
    	}
    }
  2. Kafka_Avro_Beans内にあるBean KafkaAvroDeserializerBeanを以下の内容で変更します:
    package org.example.local_project.beansjar.kafka_avro_beans;
     
    import java.util.HashMap;
    import java.util.Map;
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
     
    public class KafkaAvroDeserializerBean extends KafkaAvroDeserializer {
     
    @Override
    public void configure(KafkaAvroDeserializerConfig config) {
        Map<String, String> configs = new HashMap<String, String>();
          
        configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
            config.getString("basic.auth.credentials.source"));
        configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, 
            config.getPassword("basic.auth.user.info").value());       		
        this.schemaRegistry = new CachedSchemaRegistryClient(  
            config.getSchemaRegistryUrls(), 1000, configs);
        this.useSpecificAvroReader = false;
    }}
  3. Confluentによって発行されるAPIキーとAPIシークレットの値を使い、ルートのコンテキスト変数としてString型のAPI_KEYとPassword型のAPI_SECRETを作成します。
    コンテキストパラメーターの詳細は、コンテキストと変数を使用をご覧ください。
    APIキーとAPIシークレットの詳細は、Confluentのドキュメンテーションをご覧ください。
  4. Confluentによって発行されるSCHEMA REGISTRY KEYとSCHEMA REGISTRY SECRETの値を使い、String型のSCHEMA_REGISTRY_KEYコンテキストパラメーターとPassword型のSCHEMA_REGISTRY_SECRETコンテキストパラメーターを作成します。
  5. cKafkaのコンポーネントであるSendMessageToKafkaReceiveMessageFromKafka[Advanced settings] (詳細設定)タブで、[Kafka Properties] (Kafkaのプロパティ)フィールドに次のパラメーターを追加します。
    名前

    "additionalProperties.basic.auth.user.info"

    "RAW(" + context.SCHEMA_REGISTRY_KEY + ":" + context.SCHEMA_REGISTRY_SECRET+")"

    "additionalProperties.basic.auth.credentials.source"

    "USER_INFO"

    "additionalProperties.security.protocol"

    "SASL_SSL"

    "additionalProperties.sasl.jaas.config"

    "org.apache.kafka.common.security.plain.PlainLoginModule required username='"+ context.API_KEY +"' password='" + context.API_SECRET + "';"

    "additionalProperties.sasl.mechanism"

    "PLAIN"

    詳細は、Confluent Kafkaのセキュリティに関するドキュメンテーション(https://docs.confluent.io/platform/current/security/auth-overview.html)をご覧ください。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。