Accéder au contenu principal Passer au contenu complémentaire

Configurer un groupe de consommateurs

Créer un groupe de consommateurs pour souscrire au sujet data-history et utiliser un filtre pour suivre uniquement les évènements concernant les tâches en transition vers le statut Resolved dans Talend Data Stewardship.

Avant de commencer

Talend Data Stewardship est lancé et en cours d'exécution.

Procédure

  1. Créez une classe Java pour souscrire au sujet data-history :
    • Dans GROUP_ID, ajoutez un nom d'identification pour le groupe de consommateurs, par exemple externalGroupForTest.

      Assurez-vous de choisir un nom d'identification unique, différent de ceux déjà utilisés dans Talend Data Stewardship.

    • Dans KAFKA_URL, saisissez l'adresse du serveur Kafka.

      Cette adresse doit être identique à la valeur du paramètre bootstrap.servers dans le fichier <path_to_installation_folder>/kafka/config/consumer.properties.

    • Dans TOPIC_NAME, saisissez data-history, qui est le sujet par défaut dans Talend Data Stewardship.

      Le nom du sujet doit être identique à la valeur du paramètre history.kafka.topic dans le fichier <path_to_installation_folder>/tds/apache-tomcat/conf/data-stewardship.properties.

  2. Configurez le type des évènements à suivre et les conditions de suivi. Par exemple :
    • Dans TASK_RESOURCE_TYPE, saisissez org.talend.datasteward.tasks.model.Task pour suivre des tâches.
    • Dans CURRENT_STATE, saisissez currentState pour activer la vérification du statut de la tâche.
    • Dans TARGET_STATE, saisissez Resolved pour vérifier si le statut de la tâche est bien Resolved.
  3. Configurez un filtre pour envoyer des évènements uniquement pour les tâches en transition vers le statut Resolved, par exemple :
    if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent))

    Exemple

    Vous trouverez ci-dessous un exemple de code pour un consommateur Java de Kafka, qui lit des tâches résolues depuis un sujet et envoie des notifications dans une chaîne Slack en particulier :
    package myPackage;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.net.*;
    import java.util.Arrays;
    import java.util.Properties;
    
    import com.google.gson.JsonObject;
    
    import java.io.BufferedReader;
    import java.io.DataOutputStream;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.HttpURLConnection;
    import java.net.URL;
    
    public final class SimpleConsumer {
        private static final String GROUP_ID = "externalGroupForTest";
        private static final String KAFKA_URL = "localhost:9092";
        private static final String AUTO_COMMIT_INTERVAL = "1000";
        private static final String SESSION_TIMEOUT = "30000";
        private static final String TOPIC_NAME = "data-history";
    
        private static final ObjectMapper objectMapper = new ObjectMapper();
        public static final String TASK_RESOURCE_TYPE = "org.talend.datasteward.tasks.model.Task";
        public static final String CURRENT_STATE = "currentState";
        public static final String TARGET_STATE = "Resolved";
        public static final String TRANSITION_ACTION = "transition";
    
        private static final String SLACK_URL = "https://hooks.slack.com/services/xxxx/xxxx/xxxxx
    ";
        private static final String SLACK_USER_NAME = "TDS Resolved tasks bot";
    
        public static void main(String[] args) throws Exception {
            // Kafka consumer configuration
            Properties props = new Properties();
            props.put("bootstrap.servers", KAFKA_URL);
            props.put("group.id", GROUP_ID);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", AUTO_COMMIT_INTERVAL);
            props.put("session.timeout.ms", SESSION_TIMEOUT);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
    
            // Kafka consumer subscription
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
    
            // Start listening
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    // Get data event
                    DataEvent dataEvent = objectMapper.readValue(record.value(), DataEvent.class);
    
                    // Filter transition to Resolved state events
                    if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent)) {
                        // Consume message, here is an example showing how to send a slack notification of the event
    
                        // 1. Extract task id, previous and new state
                        FieldModification fieldModification = dataEvent.getDetail().stream().filter(d -> "currentState".equals(d.getPath())).findFirst().get();
                        // 2. Build message
                        String messageBody = "The state of task *" + dataEvent.getResourceId() + "* has been updated from *"
                                + fieldModification.getPreviousValue() + "* to *" + fieldModification.getNewValue() + "*.";
                        JsonObject message = new JsonObject();
                        message.addProperty("username", SLACK_USER_NAME);
                        message.addProperty("text", messageBody);
                        message.addProperty("unfurl_media", false);
                        message.addProperty("unfurl_links", false);
                        message.addProperty("link_names", false);
    
                        // 3. Send message
                        sendSlackNotification(message);
                    }
                }
            }
        }
    
        private static boolean isTargetStateResolved(DataEvent x) {
           return x.getDetail().stream()
                    .anyMatch(detail -> CURRENT_STATE.equals(detail.getPath()) && TARGET_STATE.equals(detail.getNewValue()));
        }
    
        private static boolean isTransitionEvent(DataEvent x) {
            return TRANSITION_ACTION.equals(x.getAction());
        }
    
        private static boolean isTaskEvent(DataEvent x) {
            return TASK_RESOURCE_TYPE.equals(x.getResourceType());
        }
    
        private static String sendSlackNotification(JsonObject msg) throws Exception {
            HttpURLConnection connection = null;
            try {
                // Create connection
                final URL url = new URL(SLACK_URL);
                connection = (HttpURLConnection) url.openConnection(Proxy.NO_PROXY);
                connection.setRequestMethod("POST");
                connection.setUseCaches(false);
                connection.setDoInput(true);
                connection.setDoOutput(true);
    
                final String payload = "payload=" + URLEncoder.encode(msg.toString(), "UTF-8");
    
                // Send request
                final DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
                wr.writeBytes(payload);
                wr.flush();
                wr.close();
    
                // Get Response
                final InputStream is = connection.getInputStream();
                final BufferedReader rd = new BufferedReader(new InputStreamReader(is));
                String line;
                StringBuilder response = new StringBuilder();
                while ((line = rd.readLine()) != null) {
                    response.append(line);
                    response.append('\n');
                }
    
                rd.close();
                return response.toString();
            }  finally {
                if (connection != null) {
                    connection.disconnect();
                }
            }
        }
    }
  4. Créez une intégration utilisant des webhooks entrants pour envoyer des notifications d'évènements Kafka dans Slack en temps réel.
  5. Sauvegardez vos modifications.

Résultats

Chaque fois que le statut de la tâche passe à Resolved, un historique des évènements est envoyé dans le sujet et un message de notification est posté dans la chaîne Slack spécifiée.

Cette page vous a-t-elle aidé ?

Si vous rencontrez des problèmes sur cette page ou dans son contenu – une faute de frappe, une étape manquante ou une erreur technique – faites-le-nous savoir.