Accéder au contenu principal Passer au contenu complémentaire

Propriétés du tKafkaInput pour Apache Spark Streaming

Ces propriétés sont utilisées pour configurer le tKafkaInput s'exécutant dans le framework de Jobs Spark Streaming.

Le composant tKafkaInput Spark Streaming appartient à la famille Messaging.

Ce composant est disponible dans Talend Real Time Big Data Platform et dans Talend Data Fabric.

Basic settings

Schema et Edit schema

Un schéma est une description de lignes. Il définit le nombre de champs (colonnes) à traiter et à passer au composant suivant. Lorsque vous créez un Job Spark, évitez le mot réservé line lors du nommage des champs.

Notez que le schéma de ce composant est en lecture seule. Il stocke le corps du message envoyé du producteur du message.

Output type

Sélectionnez le type de données à envoyer au composant suivant.

De manière générale, il est recommandé d'utiliser des types String, car le tKafkaInput peut traduire automatiquement les messages Kafka byte[] en chaînes de caractères à traiter par le Job. Cependant, si ce format de messages Kafka n'est pas connu par le tKafkaInput, comme le Protobuf, vous pouvez sélectionner byte[] et utiliser un composant Custom code comme le tJavaRow afin de désérialiser les messages en chaînes de caractères afin que les autres composants du même Job puissent traiter ces messages.

Broker list

Saisissez les adresses des nœuds du broker du cluster Kafka à utiliser.

L'adresse doit se présenter sous la forme suivante : hostname:port. Ces informations contiennent le nom et le port du nœud hébergeant dans le cluster Kafka.

Si vous devez spécifier plusieurs adresses, séparez-les à l'aide d'une virgule (,).

Starting offset

Sélectionnez le point de départ duquel les messages d'un topic sont consommés.

Dans Kafka, le numéro d'ID séquentiel d'un message se nomme offset. Dans cette liste, vous pouvez sélectionner From beginning pour commencer la consommation depuis le message le plus ancien du topic entier ou sélectionner From latest pour commencer depuis le message le plus récent ayant été consommé par le même groupe de consommateurs et à partir duquel l'offset a été commité.

Notez que, pour permettre au composant de se souvenir de la position d'un message consommé, vous devez activer le point de contrôle Spark Streaming dans l'onglet Spark Configuration de la vue Run du Job.

Chaque groupe de consommateurs possède son propre compteur pour se rappeler la position d'un message consommé. Pour cette raison, une fois qu'un groupe de consommateurs a commencé à consommer des messages d'un topic donné, un groupe de consommateurs reconnaît le message le plus récent en voyant simplement la position où son groupe a arrêté la consommation, plutôt que le topic complet. Partant de ce principe, les comportements suivants peuvent être attendus :

  • Un topic possède par exemple 100 messages. Si un groupe de consommateurs a arrêté la consommation du message à l'offset 50, lorsque vous sélectionnez From latest, le même groupe de consommateurs reprend à partir de l'offset 51.

  • Si vous créez un nouveau groupe de consommateurs ou en réinitialisez un existant, ce qui signifie que ce groupe n'a consommé aucun message de ce topic, lorsque vous le démarrez depuis le dernier message, ce nouveau groupe démarre et attend l'offset 101.

Topic name

Saisissez le nom du topic depuis lequel le tKafkaInput reçoit le flux de messages.

Group ID

Saisissez le nom du groupe de consommateurs auquel vous souhaitez que le consommateur courant (le tKafkaInput) appartienne.

Ce groupe de consommateurs sera créé lors de l'exécution s'il n'existe pas.

Cette propriété est disponible uniquement lorsque vous utilisez Spark 2.0 ou si la distribution Hadoop à utiliser exécute Spark 2.0. Si vous ne connaissez pas la version de Spark que vous utilisez, contactez l'administrateur de votre cluster pour plus d'informations.

Set number of records per second to read from each Kafka partition

Saisissez ce nombre entre guillemets doubles afin de limiter la taille de chaque batch à envoyer pour traitement.

Par exemple, si vous saisissez 100 et que la valeur du batch définie dans l'onglet Spark configuration est 2 secondes, la taille de partition pour chaque batch est de 200 messages.

Si vous laissez cette case décochée, le composant essaie de lire tous les messages disponibles en une seconde dans un batch avant d'envoyer ce dernier, ce qui peut conduire le Job à ne plus répondre s'il gère une grande quantité de messages.

Use SSL/TLS

Cochez cette case pour activer la connexion chiffrée SSL ou TLS.

Utilisez le composant tSetKeystore dans le même Job afin de spécifier les informations de chiffrement.

Cette propriété est disponible uniquement lorsque vous utilisez Spark 2.0 ou si la distribution Hadoop à utiliser exécute Spark 2.0. Si vous ne connaissez pas la version de Spark que vous utilisez, contactez l'administrateur de votre cluster pour plus d'informations.

Le fichier TrustStore et tout fichier KeyStore utilisé doivent être stockés localement, sur chaque nœud Spark hébergeant un exécuteur Spark.

Use Kerberos authentication

Si le cluster Kafka à utiliser est sécurisé par Kerberos, cochez cette case pour afficher les paramètres associés à définir :

  • JAAS configuration path : saisissez le chemin d'accès ou parcourez votre système jusqu'au fichier de configuration JAAS à utiliser par le Job pour authentification en tant que client à Kafka.

    Le fichier JAAS décrit comment les clients, les Jobs Kafka en termes Talend peuvent se connecter aux nœuds du broker Kafka, en utilisant soit le mode kinit, soit le mode keytab. Il doit être stocké sur la machine où sont exécutés les Jobs.

    Talend , Kerberos ou Kafka ne fournissent pas ce fichier JAAS. Vous devez le créer en suivant les explications dans Configuring Kafka client (uniquement en anglais) (en anglais), selon la stratégie de sécurité de votre entreprise.

  • Kafka brokers principal name : saisissez le membre primaire du Principal Kerberos défini pour les brokers lorsque vous avez créé le cluster de brokers. Par exemple, dans ce Principal kafka/kafka1.hostname.com@EXAMPLE.COM, le membre primaire à utiliser pour renseigner ce champ est kafka.

  • Set kinit command path : Kerberos utilise un chemin par défaut pour son exécutable kinit. Si vous avez modifié ce chemin, cochez cette case et saisissez votre chemin d'accès personnalisé.

    Si vous laissez cette case décochée, le chemin par défaut est utilisé.

  • Set Kerberos configuration path : Kerberos utilise un chemin par défaut vers son fichier de configuration, le fichier krb5.conf (ou krb5.ini sous Windows) pour Kerberos 5 par exemple. Si vous avez modifié ce chemin, cochez cette case et saisissez le chemin d'accès personnalisé au fichier de configuration Kerberos.

    Si vous laissez cette case décochée, une stratégie donnée est appliquée par Kerberos pour tenter de trouver les informations de configuration nécessaires. Pour plus d'informations concernant cette stratégie, consultez la section Locating the krb5.conf Configuration File dans Kerberos requirements (en anglais).

Pour plus d'informations concernant la manière dont est sécurisé un cluster Kafka via Kerberos, consultez Authenticating using SASL (uniquement en anglais) (en anglais).

Cette case est disponible depuis Kafka 0.9.0.1.

Advanced settings

Kafka properties

Ajoutez les propriétés de consommation Kafka nécessaires pour personnaliser cette table. Par exemple, configurez une valeur spécifique zookeeper.connection.timeout.ms pour éviter l'exception ZkTimeoutException.

Pour plus d'informations concernant les propriétés de consommation à définir dans cette table, consultez la section décrivant la configuration du consommateur dans la documentation Kafka, à l'adresse suivante : http://kafka.apache.org/documentation.html#consumerconfigs (uniquement en anglais) (en anglais).

Encoding

Sélectionnez l'encodage à partir de la liste ou sélectionnez Custom et définissez-le manuellement.

Cet encodage est utilisé par le tKafkaInput pour décoder les messages d'entrée.

Utilisation

Règle d'utilisation

Ce composant est utilisé en tant que composant de début et nécessite un lien de sortie.

Ce composant, ainsi que les composants Spark Streaming de la Palette à laquelle il appartient, s'affichent uniquement lorsque vous créez un Job Spark Streaming.

Notez que, dans cette documentation, sauf mention contraire, un scénario présente uniquement des Jobs Standard, c'est-à-dire des Jobs Talend traditionnels d'intégration de données.

Dans l'implémentation du composant courant dans Spark, les offsets Kafka sont automatiquement gérés par Spark, c'est-à-dire, au lieu d'être commités dans Zookeeper ou Kafka, les offsets sont suivis dans les points de contrôle Spark. Pour plus d'informations concernant cette implémentation, consultez la section relative à l'approche directe dans la documentation de Spark : http://spark.apache.org/docs/latest/streaming-kafka-integration.html (uniquement en anglais) (en anglais).

Connexion à Spark

Dans l'onglet Spark Configuration de la vue Run, définissez la connexion à un cluster Spark donné pour le Job complet. De plus, puisque le Job attend ses fichiers .jar dépendants pour l'exécution, vous devez spécifier le répertoire du système de fichiers dans lequel ces fichiers .jar sont transférés afin que Spark puisse accéder à ces fichiers :
  • Yarn mode (Yarn Client ou Yarn Cluster) :
    • Lorsque vous utilisez Google Dataproc, spécifiez un bucket dans le champ Google Storage staging bucket de l'onglet Spark configuration.

    • Lorsque vous utilisez HDInsight, spécifiez le blob à utiliser pour le déploiement du Job, dans la zone Windows Azure Storage configuration de l'onglet Spark configuration.

    • Lorsque vous utilisez Altus, spécifiez le bucket S3 ou le stockage Azure Data Lake Storage (aperçu technique) pour le déploiement du Job, dans l'onglet Spark configuration.
    • Lorsque vous utilisez Qubole, ajoutez tS3Configuration à votre Job pour écrire vos données métier dans le système S3 avec Qubole. Sans tS3Configuration, ces données métier sont écrites dans le système Qubole HDFS et détruites une fois que vous arrêtez votre cluster.
    • Lorsque vous utilisez des distributions sur site (on-premises), utilisez le composant de configuration correspondant au système de fichiers utilisé par votre cluster. Généralement, ce système est HDFS et vous devez utiliser le tHDFSConfiguration (en anglais).

  • Standalone mode : utilisez le composant de configuration correspondant au système de fichiers que votre cluster utilise, comme le tHDFSConfiguration Apache Spark Batch ou le tS3Configuration Apache Spark Batch (en anglais).

    Si vous utilisez Databricks sans composant de configuration dans votre Job, vos données métier sont écrites directement dans DBFS (Databricks Filesystem).

Cette connexion fonctionne uniquement pour le Job dans lequel vous l'avez définie.

Cette page vous a-t-elle aidé ?

Si vous rencontrez des problèmes sur cette page ou dans son contenu – une faute de frappe, une étape manquante ou une erreur technique – faites-le-nous savoir.