メイン コンテンツをスキップする 補完的コンテンツへスキップ

コンシューマグループの設定

data-historyトピックを購読するコンシューマグループを作成し、フィルターを使用してTalend Data Stewardship[Resolved] (解決済み)状態に移行したタスクに関するイベントのみを消費します。

始める前に

Talend Data Stewardshipが起動しています。

手順

  1. data-historyトピックを購読するためのJavaクラスを作成します:
    • GROUP_IDに、externalGroupForTestなどのコンシューマグループIDを追加します。

      Talend Data Stewardshipで使用したグループIDとは異なる固有の名前を付けてください。

    • KAFKA_URLに、Kafkaサーバーのアドレスを入力します。

      このアドレスは、<path_to_installation_folder>/kafka/config/consumer.propertiesにあるbootstrap.serversパラメーターの値と同じでなければなりません。

    • TOPIC_NAMETalend Data Stewardshipのデフォルトのトピックであるdata-historyを入力します。

      トピック名は、<path_to_installation_folder>/tds/apache-tomcat/conf/data-stewardship.propertiesにあるhistory.kafka.topicパラメーターの値と同じでなければなりません。

  2. 追跡するイベントの種類と条件を定義します。たとえば、次のような場合があります:
    • TASK_RESOURCE_TYPEに、org.talend.datasteward.tasks.model.Taskと入力してタスクを追跡します。
    • CURRENT_STATEに、currentStateと入力して、タスクの現在の状態を確認できるようにします。
    • TARGET_STATEResolvedと入力して、ステータスが[Resolved] (解決済み)と等しいことを確認します。
  3. タスクを[Resolved] (解決済み)状態に移行した時にのみイベントを送信するようにフィルターを定義します。次に例を示します:
    if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent))

    次の例は、トピックから解決されたタスクを読み取り、特定のSlackチャネルに通知を送信するKafka Javaコンシューマのサンプルコードを示しています:
    package myPackage;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.net.*;
    import java.util.Arrays;
    import java.util.Properties;
    
    import com.google.gson.JsonObject;
    
    import java.io.BufferedReader;
    import java.io.DataOutputStream;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.HttpURLConnection;
    import java.net.URL;
    
    public final class SimpleConsumer {
        private static final String GROUP_ID = "externalGroupForTest";
        private static final String KAFKA_URL = "localhost:9092";
        private static final String AUTO_COMMIT_INTERVAL = "1000";
        private static final String SESSION_TIMEOUT = "30000";
        private static final String TOPIC_NAME = "data-history";
    
        private static final ObjectMapper objectMapper = new ObjectMapper();
        public static final String TASK_RESOURCE_TYPE = "org.talend.datasteward.tasks.model.Task";
        public static final String CURRENT_STATE = "currentState";
        public static final String TARGET_STATE = "Resolved";
        public static final String TRANSITION_ACTION = "transition";
    
        private static final String SLACK_URL = "https://hooks.slack.com/services/xxxx/xxxx/xxxxx
    ";
        private static final String SLACK_USER_NAME = "TDS Resolved tasks bot";
    
        public static void main(String[] args) throws Exception {
            // Kafka consumer configuration
            Properties props = new Properties();
            props.put("bootstrap.servers", KAFKA_URL);
            props.put("group.id", GROUP_ID);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", AUTO_COMMIT_INTERVAL);
            props.put("session.timeout.ms", SESSION_TIMEOUT);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
    
            // Kafka consumer subscription
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
    
            // Start listening
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    // Get data event
                    DataEvent dataEvent = objectMapper.readValue(record.value(), DataEvent.class);
    
                    // Filter transition to Resolved state events
                    if(isTaskEvent(dataEvent) && isTransitionEvent(dataEvent) && isTargetStateResolved(dataEvent)) {
                        // Consume message, here is an example showing how to send a slack notification of the event
    
                        // 1. Extract task id, previous and new state
                        FieldModification fieldModification = dataEvent.getDetail().stream().filter(d -> "currentState".equals(d.getPath())).findFirst().get();
                        // 2. Build message
                        String messageBody = "The state of task *" + dataEvent.getResourceId() + "* has been updated from *"
                                + fieldModification.getPreviousValue() + "* to *" + fieldModification.getNewValue() + "*.";
                        JsonObject message = new JsonObject();
                        message.addProperty("username", SLACK_USER_NAME);
                        message.addProperty("text", messageBody);
                        message.addProperty("unfurl_media", false);
                        message.addProperty("unfurl_links", false);
                        message.addProperty("link_names", false);
    
                        // 3. Send message
                        sendSlackNotification(message);
                    }
                }
            }
        }
    
        private static boolean isTargetStateResolved(DataEvent x) {
           return x.getDetail().stream()
                    .anyMatch(detail -> CURRENT_STATE.equals(detail.getPath()) && TARGET_STATE.equals(detail.getNewValue()));
        }
    
        private static boolean isTransitionEvent(DataEvent x) {
            return TRANSITION_ACTION.equals(x.getAction());
        }
    
        private static boolean isTaskEvent(DataEvent x) {
            return TASK_RESOURCE_TYPE.equals(x.getResourceType());
        }
    
        private static String sendSlackNotification(JsonObject msg) throws Exception {
            HttpURLConnection connection = null;
            try {
                // Create connection
                final URL url = new URL(SLACK_URL);
                connection = (HttpURLConnection) url.openConnection(Proxy.NO_PROXY);
                connection.setRequestMethod("POST");
                connection.setUseCaches(false);
                connection.setDoInput(true);
                connection.setDoOutput(true);
    
                final String payload = "payload=" + URLEncoder.encode(msg.toString(), "UTF-8");
    
                // Send request
                final DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
                wr.writeBytes(payload);
                wr.flush();
                wr.close();
    
                // Get Response
                final InputStream is = connection.getInputStream();
                final BufferedReader rd = new BufferedReader(new InputStreamReader(is));
                String line;
                StringBuilder response = new StringBuilder();
                while ((line = rd.readLine()) != null) {
                    response.append(line);
                    response.append('\n');
                }
    
                rd.close();
                return response.toString();
            }  finally {
                if (connection != null) {
                    connection.disconnect();
                }
            }
        }
    }
  4. Kafkaイベントの通知をリアルタイムでSlackに受信するWebhookを使って統合を構築します。
  5. 変更を保存します。

タスクの結果

タスクステータスが[Resolved] (解決済み)に変更されるたびに、履歴イベントがトピックに送信され、通知メッセージがSlack内の指定されたチャネルに送信されます。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。