Configuring a consumer group
Create a consumer group to subscribe to the data-history topic
and use a filter to consume only events about tasks transitioned to the
Resolved state in Talend Data Stewardship.
Before you begin
Procedure
-
Create a Java class to subscribe to the data-history
topic:
- In GROUP_ID, add a consumer group identification, for
example externalGroupForTest.
Make sure to have a unique name different from any group identification used in Talend Data Stewardship.
- In KAFKA_URL, enter the address of the Kafka
server.
This address should be identical to the value of the bootstrap.servers parameter at <path_to_installation_folder>/kafka/config/consumer.properties.
- In TOPIC_NAME, enter
data-history which is the by-default topic in
Talend Data Stewardship.
The topic name should be identical to the value of the history.kafka.topic parameter at <path_to_installation_folder>/tds/apache-tomcat/conf/data-stewardship.properties.
- In GROUP_ID, add a consumer group identification, for
example externalGroupForTest.
-
Define the type of the event to track and on what condition. For example:
- In TASK_RESOURCE_TYPE, enter org.talend.datasteward.tasks.model.Task to track tasks.
- In CURRENT_STATE, enter currentState to enable checking the current state of the task.
- In TARGET_STATE, enter Resolved to verify the status is equal to Resolved.
-
Define a filter to send events only upon moving tasks to the
Resolved state, for example:
if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent))
Example
The following example shows sample code for a Kafka Java consumer which reads resolved tasks from a topic and sends notifications to a specific Slack channel:package myPackage; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.net.*; import java.util.Arrays; import java.util.Properties; import com.google.gson.JsonObject; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; public final class SimpleConsumer { private static final String GROUP_ID = "externalGroupForTest"; private static final String KAFKA_URL = "localhost:9092"; private static final String AUTO_COMMIT_INTERVAL = "1000"; private static final String SESSION_TIMEOUT = "30000"; private static final String TOPIC_NAME = "data-history"; private static final ObjectMapper objectMapper = new ObjectMapper(); public static final String TASK_RESOURCE_TYPE = "org.talend.datasteward.tasks.model.Task"; public static final String CURRENT_STATE = "currentState"; public static final String TARGET_STATE = "Resolved"; public static final String TRANSITION_ACTION = "transition"; private static final String SLACK_URL = "https://hooks.slack.com/services/xxxx/xxxx/xxxxx "; private static final String SLACK_USER_NAME = "TDS Resolved tasks bot"; public static void main(String[] args) throws Exception { // Kafka consumer configuration Properties props = new Properties(); props.put("bootstrap.servers", KAFKA_URL); props.put("group.id", GROUP_ID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", AUTO_COMMIT_INTERVAL); props.put("session.timeout.ms", SESSION_TIMEOUT); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(props); // Kafka consumer subscription consumer.subscribe(Arrays.asList(TOPIC_NAME)); // Start listening while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { // Get data event DataEvent dataEvent = objectMapper.readValue(record.value(), DataEvent.class); // Filter transition to Resolved state events if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent)) { // Consume message, here is an example showing how to send a slack notification of the event // 1. Extract task id, previous and new state FieldModification fieldModification = dataEvent.getDetail().stream().filter(d -> "currentState".equals(d.getPath())).findFirst().get(); // 2. Build message String messageBody = "The state of task *" + dataEvent.getResourceId() + "* has been updated from *" + fieldModification.getPreviousValue() + "* to *" + fieldModification.getNewValue() + "*."; JsonObject message = new JsonObject(); message.addProperty("username", SLACK_USER_NAME); message.addProperty("text", messageBody); message.addProperty("unfurl_media", false); message.addProperty("unfurl_links", false); message.addProperty("link_names", false); // 3. Send message sendSlackNotification(message); } } } } private static boolean isTargetStateResolved(DataEvent x) { return x.getDetail().stream() .anyMatch(detail -> CURRENT_STATE.equals(detail.getPath()) && TARGET_STATE.equals(detail.getNewValue())); } private static boolean isTransitionEvent(DataEvent x) { return TRANSITION_ACTION.equals(x.getAction()); } private static boolean isTaskEvent(DataEvent x) { return TASK_RESOURCE_TYPE.equals(x.getResourceType()); } private static String sendSlackNotification(JsonObject msg) throws Exception { HttpURLConnection connection = null; try { // Create connection final URL url = new URL(SLACK_URL); connection = (HttpURLConnection) url.openConnection(Proxy.NO_PROXY); connection.setRequestMethod("POST"); connection.setUseCaches(false); connection.setDoInput(true); connection.setDoOutput(true); final String payload = "payload=" + URLEncoder.encode(msg.toString(), "UTF-8"); // Send request final DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); wr.writeBytes(payload); wr.flush(); wr.close(); // Get Response final InputStream is = connection.getInputStream(); final BufferedReader rd = new BufferedReader(new InputStreamReader(is)); String line; StringBuilder response = new StringBuilder(); while ((line = rd.readLine()) != null) { response.append(line); response.append('\n'); } rd.close(); return response.toString(); } finally { if (connection != null) { connection.disconnect(); } } } }
- Build an integration using incoming Webhooks to send notifications of the Kafka events to slack in real time.
- Save your changes.
Results
Did this page help you?
If you find any issues with this page or its content – a typo, a missing step, or a technical error – please let us know!