Skip to main content Skip to complementary content

Configuring a consumer group

Create a consumer group to subscribe to the data-history topic and use a filter to consume only events about tasks transitioned to the Resolved state in Talend Data Stewardship.

Before you begin

Talend Data Stewardship is up an running.

Procedure

  1. Create a Java class to subscribe to the data-history topic:
    • In GROUP_ID, add a consumer group identification, for example externalGroupForTest.

      Make sure to have a unique name different from any group identification used in Talend Data Stewardship.

    • In KAFKA_URL, enter the address of the Kafka server.

      This address should be identical to the value of the bootstrap.servers parameter at <path_to_installation_folder>/kafka/config/consumer.properties.

    • In TOPIC_NAME, enter data-history which is the by-default topic in Talend Data Stewardship.

      The topic name should be identical to the value of the history.kafka.topic parameter at <path_to_installation_folder>/tds/apache-tomcat/conf/data-stewardship.properties.

  2. Define the type of the event to track and on what condition. For example:
    • In TASK_RESOURCE_TYPE, enter org.talend.datasteward.tasks.model.Task to track tasks.
    • In CURRENT_STATE, enter currentState to enable checking the current state of the task.
    • In TARGET_STATE, enter Resolved to verify the status is equal to Resolved.
  3. Define a filter to send events only upon moving tasks to the Resolved state, for example:
    if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent))

    Example

    The following example shows sample code for a Kafka Java consumer which reads resolved tasks from a topic and sends notifications to a specific Slack channel:
    package myPackage;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.net.*;
    import java.util.Arrays;
    import java.util.Properties;
    
    import com.google.gson.JsonObject;
    
    import java.io.BufferedReader;
    import java.io.DataOutputStream;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.HttpURLConnection;
    import java.net.URL;
    
    public final class SimpleConsumer {
        private static final String GROUP_ID = "externalGroupForTest";
        private static final String KAFKA_URL = "localhost:9092";
        private static final String AUTO_COMMIT_INTERVAL = "1000";
        private static final String SESSION_TIMEOUT = "30000";
        private static final String TOPIC_NAME = "data-history";
    
        private static final ObjectMapper objectMapper = new ObjectMapper();
        public static final String TASK_RESOURCE_TYPE = "org.talend.datasteward.tasks.model.Task";
        public static final String CURRENT_STATE = "currentState";
        public static final String TARGET_STATE = "Resolved";
        public static final String TRANSITION_ACTION = "transition";
    
        private static final String SLACK_URL = "https://hooks.slack.com/services/xxxx/xxxx/xxxxx
    ";
        private static final String SLACK_USER_NAME = "TDS Resolved tasks bot";
    
        public static void main(String[] args) throws Exception {
            // Kafka consumer configuration
            Properties props = new Properties();
            props.put("bootstrap.servers", KAFKA_URL);
            props.put("group.id", GROUP_ID);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", AUTO_COMMIT_INTERVAL);
            props.put("session.timeout.ms", SESSION_TIMEOUT);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
    
            // Kafka consumer subscription
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
    
            // Start listening
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    // Get data event
                    DataEvent dataEvent = objectMapper.readValue(record.value(), DataEvent.class);
    
                    // Filter transition to Resolved state events
                    if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent)) {
                        // Consume message, here is an example showing how to send a slack notification of the event
    
                        // 1. Extract task id, previous and new state
                        FieldModification fieldModification = dataEvent.getDetail().stream().filter(d -> "currentState".equals(d.getPath())).findFirst().get();
                        // 2. Build message
                        String messageBody = "The state of task *" + dataEvent.getResourceId() + "* has been updated from *"
                                + fieldModification.getPreviousValue() + "* to *" + fieldModification.getNewValue() + "*.";
                        JsonObject message = new JsonObject();
                        message.addProperty("username", SLACK_USER_NAME);
                        message.addProperty("text", messageBody);
                        message.addProperty("unfurl_media", false);
                        message.addProperty("unfurl_links", false);
                        message.addProperty("link_names", false);
    
                        // 3. Send message
                        sendSlackNotification(message);
                    }
                }
            }
        }
    
        private static boolean isTargetStateResolved(DataEvent x) {
           return x.getDetail().stream()
                    .anyMatch(detail -> CURRENT_STATE.equals(detail.getPath()) && TARGET_STATE.equals(detail.getNewValue()));
        }
    
        private static boolean isTransitionEvent(DataEvent x) {
            return TRANSITION_ACTION.equals(x.getAction());
        }
    
        private static boolean isTaskEvent(DataEvent x) {
            return TASK_RESOURCE_TYPE.equals(x.getResourceType());
        }
    
        private static String sendSlackNotification(JsonObject msg) throws Exception {
            HttpURLConnection connection = null;
            try {
                // Create connection
                final URL url = new URL(SLACK_URL);
                connection = (HttpURLConnection) url.openConnection(Proxy.NO_PROXY);
                connection.setRequestMethod("POST");
                connection.setUseCaches(false);
                connection.setDoInput(true);
                connection.setDoOutput(true);
    
                final String payload = "payload=" + URLEncoder.encode(msg.toString(), "UTF-8");
    
                // Send request
                final DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
                wr.writeBytes(payload);
                wr.flush();
                wr.close();
    
                // Get Response
                final InputStream is = connection.getInputStream();
                final BufferedReader rd = new BufferedReader(new InputStreamReader(is));
                String line;
                StringBuilder response = new StringBuilder();
                while ((line = rd.readLine()) != null) {
                    response.append(line);
                    response.append('\n');
                }
    
                rd.close();
                return response.toString();
            }  finally {
                if (connection != null) {
                    connection.disconnect();
                }
            }
        }
    }
  4. Build an integration using incoming Webhooks to send notifications of the Kafka events to slack in real time.
  5. Save your changes.

Results

Every time the task status is changed to Resolved, a history event is sent to the topic and a notification message is posted to the specified channel in Slack.

Did this page help you?

If you find any issues with this page or its content – a typo, a missing step, or a technical error – please let us know!