コンシューマグループの設定
data-historyトピックを購読するコンシューマグループを作成し、フィルターを使用してTalend Data Stewardshipで[Resolved] (解決済み)状態に移行したタスクに関するイベントのみを消費します。
始める前に
手順
-
data-historyトピックを購読するためのJavaクラスを作成します:
- GROUP_IDに、externalGroupForTestなどのコンシューマグループIDを追加します。
Talend Data Stewardshipで使用したグループIDとは異なる固有の名前を付けてください。
- KAFKA_URLに、Kafkaサーバーのアドレスを入力します。
このアドレスは、<path_to_installation_folder>/kafka/config/consumer.propertiesにあるbootstrap.serversパラメーターの値と同じでなければなりません。
- TOPIC_NAMEにTalend Data Stewardshipのデフォルトのトピックであるdata-historyを入力します。
トピック名は、<path_to_installation_folder>/tds/apache-tomcat/conf/data-stewardship.propertiesにあるhistory.kafka.topicパラメーターの値と同じでなければなりません。
- GROUP_IDに、externalGroupForTestなどのコンシューマグループIDを追加します。
-
追跡するイベントの種類と条件を定義します。たとえば、次のような場合があります:
- TASK_RESOURCE_TYPEに、org.talend.datasteward.tasks.model.Taskと入力してタスクを追跡します。
- CURRENT_STATEに、currentStateと入力して、タスクの現在の状態を確認できるようにします。
- TARGET_STATEにResolvedと入力して、ステータスが[Resolved] (解決済み)と等しいことを確認します。
-
タスクを[Resolved] (解決済み)状態に移行した時にのみイベントを送信するようにフィルターを定義します。次に例を示します:
if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent))
例
次の例は、トピックから解決されたタスクを読み取り、特定のSlackチャネルに通知を送信するKafka Javaコンシューマのサンプルコードを示しています:package myPackage; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.net.*; import java.util.Arrays; import java.util.Properties; import com.google.gson.JsonObject; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; public final class SimpleConsumer { private static final String GROUP_ID = "externalGroupForTest"; private static final String KAFKA_URL = "localhost:9092"; private static final String AUTO_COMMIT_INTERVAL = "1000"; private static final String SESSION_TIMEOUT = "30000"; private static final String TOPIC_NAME = "data-history"; private static final ObjectMapper objectMapper = new ObjectMapper(); public static final String TASK_RESOURCE_TYPE = "org.talend.datasteward.tasks.model.Task"; public static final String CURRENT_STATE = "currentState"; public static final String TARGET_STATE = "Resolved"; public static final String TRANSITION_ACTION = "transition"; private static final String SLACK_URL = "https://hooks.slack.com/services/xxxx/xxxx/xxxxx "; private static final String SLACK_USER_NAME = "TDS Resolved tasks bot"; public static void main(String[] args) throws Exception { // Kafka consumer configuration Properties props = new Properties(); props.put("bootstrap.servers", KAFKA_URL); props.put("group.id", GROUP_ID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", AUTO_COMMIT_INTERVAL); props.put("session.timeout.ms", SESSION_TIMEOUT); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(props); // Kafka consumer subscription consumer.subscribe(Arrays.asList(TOPIC_NAME)); // Start listening while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { // Get data event DataEvent dataEvent = objectMapper.readValue(record.value(), DataEvent.class); // Filter transition to Resolved state events if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent)) { // Consume message, here is an example showing how to send a slack notification of the event // 1. Extract task id, previous and new state FieldModification fieldModification = dataEvent.getDetail().stream().filter(d -> "currentState".equals(d.getPath())).findFirst().get(); // 2. Build message String messageBody = "The state of task *" + dataEvent.getResourceId() + "* has been updated from *" + fieldModification.getPreviousValue() + "* to *" + fieldModification.getNewValue() + "*."; JsonObject message = new JsonObject(); message.addProperty("username", SLACK_USER_NAME); message.addProperty("text", messageBody); message.addProperty("unfurl_media", false); message.addProperty("unfurl_links", false); message.addProperty("link_names", false); // 3. Send message sendSlackNotification(message); } } } } private static boolean isTargetStateResolved(DataEvent x) { return x.getDetail().stream() .anyMatch(detail -> CURRENT_STATE.equals(detail.getPath()) && TARGET_STATE.equals(detail.getNewValue())); } private static boolean isTransitionEvent(DataEvent x) { return TRANSITION_ACTION.equals(x.getAction()); } private static boolean isTaskEvent(DataEvent x) { return TASK_RESOURCE_TYPE.equals(x.getResourceType()); } private static String sendSlackNotification(JsonObject msg) throws Exception { HttpURLConnection connection = null; try { // Create connection final URL url = new URL(SLACK_URL); connection = (HttpURLConnection) url.openConnection(Proxy.NO_PROXY); connection.setRequestMethod("POST"); connection.setUseCaches(false); connection.setDoInput(true); connection.setDoOutput(true); final String payload = "payload=" + URLEncoder.encode(msg.toString(), "UTF-8"); // Send request final DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); wr.writeBytes(payload); wr.flush(); wr.close(); // Get Response final InputStream is = connection.getInputStream(); final BufferedReader rd = new BufferedReader(new InputStreamReader(is)); String line; StringBuilder response = new StringBuilder(); while ((line = rd.readLine()) != null) { response.append(line); response.append('\n'); } rd.close(); return response.toString(); } finally { if (connection != null) { connection.disconnect(); } } } }
- Kafkaイベントの通知をリアルタイムでSlackに受信するWebhookを使って統合を構築します。
- 変更を保存します。