Configurer un groupe de consommateurs
Créer un groupe de consommateurs pour souscrire au sujet data-history et utiliser un filtre pour suivre uniquement les évènements concernant les tâches en transition vers le statut Resolved dans Talend Data Stewardship.
Avant de commencer
Procédure
-
Créez une classe Java pour souscrire au sujet data-history :
- Dans GROUP_ID, ajoutez un nom d'identification pour le groupe de consommateurs, par exemple externalGroupForTest.
Assurez-vous de choisir un nom d'identification unique, différent de ceux déjà utilisés dans Talend Data Stewardship.
- Dans KAFKA_URL, saisissez l'adresse du serveur Kafka.
Cette adresse doit être identique à la valeur du paramètre bootstrap.servers dans le fichier <path_to_installation_folder>/kafka/config/consumer.properties.
- Dans TOPIC_NAME, saisissez data-history, qui est le sujet par défaut dans Talend Data Stewardship.
Le nom du sujet doit être identique à la valeur du paramètre history.kafka.topic dans le fichier <path_to_installation_folder>/tds/apache-tomcat/conf/data-stewardship.properties.
- Dans GROUP_ID, ajoutez un nom d'identification pour le groupe de consommateurs, par exemple externalGroupForTest.
-
Configurez le type des évènements à suivre et les conditions de suivi. Par exemple :
- Dans TASK_RESOURCE_TYPE, saisissez org.talend.datasteward.tasks.model.Task pour suivre des tâches.
- Dans CURRENT_STATE, saisissez currentState pour activer la vérification du statut de la tâche.
- Dans TARGET_STATE, saisissez Resolved pour vérifier si le statut de la tâche est bien Resolved.
-
Configurez un filtre pour envoyer des évènements uniquement pour les tâches en transition vers le statut Resolved, par exemple :
if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent))
Exemple
Vous trouverez ci-dessous un exemple de code pour un consommateur Java de Kafka, qui lit des tâches résolues depuis un sujet et envoie des notifications dans une chaîne Slack en particulier :package myPackage; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.net.*; import java.util.Arrays; import java.util.Properties; import com.google.gson.JsonObject; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; public final class SimpleConsumer { private static final String GROUP_ID = "externalGroupForTest"; private static final String KAFKA_URL = "localhost:9092"; private static final String AUTO_COMMIT_INTERVAL = "1000"; private static final String SESSION_TIMEOUT = "30000"; private static final String TOPIC_NAME = "data-history"; private static final ObjectMapper objectMapper = new ObjectMapper(); public static final String TASK_RESOURCE_TYPE = "org.talend.datasteward.tasks.model.Task"; public static final String CURRENT_STATE = "currentState"; public static final String TARGET_STATE = "Resolved"; public static final String TRANSITION_ACTION = "transition"; private static final String SLACK_URL = "https://hooks.slack.com/services/xxxx/xxxx/xxxxx "; private static final String SLACK_USER_NAME = "TDS Resolved tasks bot"; public static void main(String[] args) throws Exception { // Kafka consumer configuration Properties props = new Properties(); props.put("bootstrap.servers", KAFKA_URL); props.put("group.id", GROUP_ID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", AUTO_COMMIT_INTERVAL); props.put("session.timeout.ms", SESSION_TIMEOUT); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(props); // Kafka consumer subscription consumer.subscribe(Arrays.asList(TOPIC_NAME)); // Start listening while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { // Get data event DataEvent dataEvent = objectMapper.readValue(record.value(), DataEvent.class); // Filter transition to Resolved state events if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent)) { // Consume message, here is an example showing how to send a slack notification of the event // 1. Extract task id, previous and new state FieldModification fieldModification = dataEvent.getDetail().stream().filter(d -> "currentState".equals(d.getPath())).findFirst().get(); // 2. Build message String messageBody = "The state of task *" + dataEvent.getResourceId() + "* has been updated from *" + fieldModification.getPreviousValue() + "* to *" + fieldModification.getNewValue() + "*."; JsonObject message = new JsonObject(); message.addProperty("username", SLACK_USER_NAME); message.addProperty("text", messageBody); message.addProperty("unfurl_media", false); message.addProperty("unfurl_links", false); message.addProperty("link_names", false); // 3. Send message sendSlackNotification(message); } } } } private static boolean isTargetStateResolved(DataEvent x) { return x.getDetail().stream() .anyMatch(detail -> CURRENT_STATE.equals(detail.getPath()) && TARGET_STATE.equals(detail.getNewValue())); } private static boolean isTransitionEvent(DataEvent x) { return TRANSITION_ACTION.equals(x.getAction()); } private static boolean isTaskEvent(DataEvent x) { return TASK_RESOURCE_TYPE.equals(x.getResourceType()); } private static String sendSlackNotification(JsonObject msg) throws Exception { HttpURLConnection connection = null; try { // Create connection final URL url = new URL(SLACK_URL); connection = (HttpURLConnection) url.openConnection(Proxy.NO_PROXY); connection.setRequestMethod("POST"); connection.setUseCaches(false); connection.setDoInput(true); connection.setDoOutput(true); final String payload = "payload=" + URLEncoder.encode(msg.toString(), "UTF-8"); // Send request final DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); wr.writeBytes(payload); wr.flush(); wr.close(); // Get Response final InputStream is = connection.getInputStream(); final BufferedReader rd = new BufferedReader(new InputStreamReader(is)); String line; StringBuilder response = new StringBuilder(); while ((line = rd.readLine()) != null) { response.append(line); response.append('\n'); } rd.close(); return response.toString(); } finally { if (connection != null) { connection.disconnect(); } } } }
- Créez une intégration utilisant des webhooks entrants pour envoyer des notifications d'évènements Kafka dans Slack en temps réel.
- Sauvegardez vos modifications.
Résultats
Cette page vous a-t-elle aidé ?
Si vous rencontrez des problèmes sur cette page ou dans son contenu – une faute de frappe, une étape manquante ou une erreur technique – faites-le-nous savoir.