Zu Hauptinhalt springen Zu ergänzendem Inhalt springen

Verarbeiten von Streamingdaten zu Flugzeugen

Eine Pipeline mit einer Kafka-Quelle, einem Prozessor vom Typ „Window (Fenster)“, einem Prozessor vom Typ „Aggregate (Aggregieren), einem Prozessor vom Typ „Field selector (Feldauswahl)“, einem Prozessor vom Typ „Python 3“ und einem MySQL-Ziel.

Vorbereitungen

  • Sie haben zuvor eine Verbindung zu dem System erstellt, in dem die Quelldaten gespeichert sind.

  • Sie haben zuvor den Datensatz hinzugefügt, der die Quelldaten enthält.

    In diesem Beispiel Streamingdaten zu Flugzeugen, d. h. Flugzeug-ID, Position und Zeitstempel.

    Um dieses Szenario besser zu verstehen, finden Sie nachstehend das AVRO-Schema der im Szenario verwendeten Streamingdaten:
    {
      "type": "record",
      "name": "aircrafts",
      "fields": [
      {"name": "Id", "type": "int"},
      {"name": "PosTime", "type": "long"},
      {"name": "Lat", "type": "double"},
      {"name": "Long", "type": "double"},
      {"name": "Op", "type": "string"}
      ]
    }

    Hierbei gilt: Id entspricht der Flugzeug-ID, PosTime dem Zeitstempel der Position, Lat/Long dem Breiten-/Ländengrad des Flugzeugs und Op den Fluggesellschaften.

  • Sie haben außerdem die Verbindung und den zugehörigen Datensatz erstellt, der die verarbeiteten Daten aufnehmen soll.

    In diesem Beispiel eine MySQL-Tabelle.

Prozedur

  1. Klicken Sie auf der Seite Pipelines auf Add pipeline (Pipeline hinzufügen). Ihre neue Pipeline wird geöffnet.
  2. Legen Sie einen sinnvollen Namen für die Pipeline fest.

    Example

    Process streaming aircraft data (Streamingdaten zu Flugzeugen verarbeiten)
  3. Klicken Sie auf ADD SOURCE (QUELLE HINZUFÜGEN), um ein Fenster zu öffnen, in dem Sie Ihre Quelldaten auswählen können, in diesem Beispiel das Flugzeug-Topic in Kafka.
    Vorschau eines Datenbeispiels mit Flugzeugdatensätzen
  4. Wählen Sie den Datensatz aus und klicken Sie auf Select (Auswählen), um ihn zur Pipeline hinzuzufügen.
    Benennen Sie ihn um, falls erforderlich.
  5. Klicken Sie auf Plus und fügen Sie einen Prozessor vom Typ Window (Fenster) zur Pipeline hinzu. Daraufhin wird das Konfigurationsfenster geöffnet.
  6. Geben Sie einen sinnvollen Namen für den Prozessor an.

    Example

    5sec window (5-Sek.-Fenster)
  7. Führen Sie im Bereich Configuration (Konfiguration) Folgendes durch:
    1. Aktivieren Sie die Option Use Window session (Fenstersitzung verwenden).
    2. Geben Sie den Wert 5000 als Fensterdauer ein, damit alle 5 Sekunden Daten erfasst werden.
  8. Klicken Sie auf Plus und fügen Sie einen Prozessor vom Typ Aggregate (Aggregieren) zur Pipeline hinzu. Daraufhin wird das Konfigurationsfenster geöffnet.
  9. Geben Sie einen sinnvollen Namen für den Prozessor an.

    Example

    group by aircraft (Nach Flugzeug gruppieren)
  10. Wählen Sie im Bereich Group by (Gruppieren nach) die Felder aus, die Sie für Ihre Aggregationsgruppe verwenden möchten: in diesem Beispiel .customerId (Kunden-ID).
    1. Wählen Sie .Id in der Liste Field (Feld) aus, um eine Gruppierung nach der Flugzeug-ID durchzuführen.
    2. Fügen Sie ein New element (Neues Element) hinzu und wählen Sie .Op (Betreiber) in der Liste Field (Feld) aus, um eine Gruppierung nach Fluggesellschaften durchzuführen.
  11. Führen Sie im Bereich Operations (Operationen) Folgendes durch:
    1. Wählen Sie .PosTime (.PositionUhrzeit) in der Liste Field (Feld) aus und Maximum in der Liste Operation aus.
    2. Geben Sie dem generierten Feld (Output field (Ausgabefeld)) einen Namen, z. B. date (Datum).
    3. Klicken Sie auf das Pluszeichen (+), um ein neues Element hinzuzufügen, wählen Sie .Lat (.Breitengrad) in der Liste Field path (Feldpfad) und List (Liste) in der Liste Operation aus.
    4. Legen Sie für das generierte Feld einen Namen fest, beispielsweise lat (Breitengrad).
    5. Klicken Sie auf das Pluszeichen (+), um ein neues Element hinzuzufügen, wählen Sie .Long (.Längengrad) in der Liste Field path (Feldpfad) und List (Liste) in der Liste Operation aus.
    6. Legen Sie für das generierte Feld einen Namen fest, beispielsweise lon (Längengrad).
  12. Klicken Sie auf Save (Speichern), um die Konfiguration zu speichern.
  13. Klicken Sie auf Plus und fügen Sie einen Prozessor vom Typ Field Selector (Feldauswahl) zur Pipeline hinzu. Daraufhin wird das Konfigurationsfenster geöffnet.
  14. Geben Sie einen sinnvollen Namen für den Prozessor an.

    Example

    select latest position (Letzte Position auswählen)
  15. Im Bereich Selectors (Selektoren) im Modus Advanced (Erweitert):
    1. Geben Sie id (ID) in die Liste Output (Ausgabe) und .id (.ID) in die Liste Input (Eingabe) ein, da Sie das Feld id (ID) auswählen und dieselbe Feldposition beibehalten möchten.
    2. Klicken Sie auf das Pluszeichen (+), um ein neues Element hinzuzufügen, und geben Sie airlines (Fluggesellschaften) in der Liste Output (Ausgabe) und .Op (.Betreiber) in der Liste Input (Eingabe) ein, da Sie das Feld Op (Betreiber) auswählen und umbenennen möchten.
    3. Klicken Sie auf das Pluszeichen (+), um ein neues Element hinzuzufügen, und geben Sie (date) (Datum) in der Liste Output (Ausgabe) und .date (.Datum) in der Liste Input (Eingabe) ein, da Sie das Feld date (Datum) auswählen und dieselbe Feldposition beibehalten möchten.
    4. Klicken Sie auf das Pluszeichen (+), um ein neues Element hinzuzufügen, und geben Sie lat (Breitengrad) in der Liste Output (Ausgabe) und .lat[-1] (.Breitengrad[-1]) in die Liste Input (Eingabe) ein, da Sie das Feld lat (Breitengrad) von der ursprünglichen Position auf eine niedrigere Ebene im Schema verschieben möchten.
    5. Klicken Sie auf das Pluszeichen (+), um ein neues Element hinzuzufügen, und geben Sie lon (Längengrad) in der Liste Output (Ausgabe) und .lon[-1] (.Längengrad[-1]) in der Liste Input (Eingabe) ein, da Sie das Feld lon (Längengrad) von der ursprünglichen Position auf eine niedrigere Ebene im Schema verschieben möchten.

      In diesem Bereich können Sie die avpath-Syntax verwenden.

  16. Klicken Sie auf Save (Speichern), um die Konfiguration zu speichern.
  17. Klicken Sie auf Plus und fügen Sie einen Prozessor vom Typ Python 3 zur Pipeline hinzu. Daraufhin wird das Konfigurationsfenster geöffnet.
  18. Geben Sie einen sinnvollen Namen für den Prozessor an.

    Example

    compute geohash (Geohash verarbeiten)
  19. Geben Sie im Bereich Phyton 3 code (Python 3-Code) Folgendes ein:
    def encode(latitude, longitude, precision=12):
      
        __base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
        __decodemap = { }
        for i in range(len(__base32)):
            __decodemap[__base32[i]] = i
        del i
    
        lat_interval, lon_interval = (-90.0, 90.0), (-180.0, 180.0)
        geohash = []
        bits = [ 16, 8, 4, 2, 1 ]
        bit = 0
        ch = 0
        even = True
        while len(geohash) < precision:
            if even:
                mid = (lon_interval[0] + lon_interval[1]) / 2
                if longitude > mid:
                    ch |= bits[bit]
                    lon_interval = (mid, lon_interval[1])
                else:
                    lon_interval = (lon_interval[0], mid)
            else:
                mid = (lat_interval[0] + lat_interval[1]) / 2
                if latitude > mid:
                    ch |= bits[bit]
                    lat_interval = (mid, lat_interval[1])
                else:
                    lat_interval = (lat_interval[0], mid)
            even = not even
            if bit < 4:
                bit += 1
            else:
                geohash += __base32[ch]
                bit = 0
                ch = 0
        return ''.join(geohash)
    
    output = json.loads("{}")
    output['id'] = input['id']
    output['airlines'] = input['airlines']
    output['date'] = input['date']
    output['location'] = encode(input['lat'], input['lon'])
    
    output.append(output)

    Dieser Code ermöglicht Ihnen die Berechnung von Geohashes (eine Zeichenfolge, die die geografische Position ausgehend von Breiten- und Längengrad angibt).

  20. Klicken Sie auf Save (Speichern), um die Konfiguration zu speichern.
  21. Klicken Sie auf ADD DESTINATION (ZIEL HINZUFÜGEN) und wählen Sie den Datensatz aus, der die verarbeiteten Daten aufnehmen soll.
    Benennen Sie ihn um, falls erforderlich.
  22. (Optional) Sehen Sie sich die Vorschau des Prozessors vom Typ Python 3 an, um Ihre Daten zu prüfen.
    Vorschau des Prozessors vom Typ „Python 3“ nach dem Berechnen der Geohash-Informationen.
  23. Klicken Sie in der oberen Symbolleiste von Talend Cloud Pipeline Designer auf die Schaltfläche Run (Ausführen), um das Fenster zur Auswahl des Ausführungsprofils zu öffnen.
  24. Wählen Sie Ihr Ausführungsprofil in der Liste aus (weitere Informationen finden Sie unter „Ausführungsprofile“) und klicken Sie dann auf Run (Ausführen), um die Pipeline auszuführen.

Ergebnisse

Ihre Streaming-Pipeline wird ausgeführt und bleibt aktiv, bis Sie sie explizit beenden. Die Flugzeugdaten werden geändert und die berechneten Geohash-Daten an das von Ihnen angegebene Zielsystem gesendet.

Hat diese Seite Ihnen geholfen?

Wenn Sie ein Problem mit dieser Seite oder ihrem Inhalt feststellen, sei es ein Tippfehler, ein ausgelassener Schritt oder ein technischer Fehler, informieren Sie uns bitte!