Skip to main content Skip to complementary content

Reading streaming messages from a Google Pub/Sub topic

This scenario aims at helping you set up and use connectors in a pipeline. You are advised to adapt it to your environment and use case.

Example of a pipeline created from the instructions below.

About this task

This scenario processes streaming JSON message data about books published in a Google Pub/Sub topic.

Procedure

  1. Click Connections > Add connection.
  2. In the panel that opens, select the type of connection you want to create.

    Example

    Google Pub/Sub
  3. Select your engine in the Engine list.
    Information noteNote:
    • It is recommended to use the Remote Engine Gen2 rather than the Cloud Engine for Design for advanced processing of data.
    • If no Remote Engine Gen2 has been created from Talend Management Console or if it exists but appears as unavailable which means it is not up and running, you will not be able to select a Connection type in the list nor to save the new connection.
    • The list of available connection types depends on the engine you have selected.
  4. Select the type of connection you want to create.
    Here, select Google Pub/Sub.
  5. Fill in the connection properties to access your Google project as described in Google Pub/Sub properties, including your project name and JSON credentials, check the connection and click Add dataset.
  6. In the Add a new dataset panel, name your dataset book prices.
  7. Select Google Pub/Sub in the connection list.
  8. Fill in the required properties to access the file located in your Pub/Sub topic (topic name, subscription name, data format) and click View sample to see a preview of your dataset sample.
    Configuration of a new Google Pub/Sub dataset.
  9. Click Validate to save your dataset.
  10. Do the same to add a Test connection and dataset that will be used as a destination in your pipeline.
  11. Click Add pipeline on the Pipelines page. Your new pipeline opens.
  12. Click ADD SOURCE to open the panel allowing you to select your source data, here the JSON messages published to Pub/Sub.
  13. Select your dataset and click Select in order to add it to the pipeline.
    Rename it if needed.
  14. Click add processor and add a Window processor to the pipeline. The configuration panel opens.
  15. Give a meaningful name to the processor.

    Example

    5sec window
  16. In the Configuration tab:
    1. Enable the Use Window session toggle.
    2. Type in 5000 as the window duration in order to capture data every 5 seconds.
  17. Click Save to save your configuration.
  18. Click ADD DESTINATION and select the test dataset that will hold your reorganized data.
    Rename it if needed.
  19. In the Configuration tab, enable the Log records to STDOUT toggle as you want to store the output logs.
  20. Click Save to save your configuration.
  21. On the top toolbar of Talend Cloud Pipeline Designer, click the Run button to open the panel allowing you to select your run profile.
  22. Select your run profile in the list (for more information, see Run profiles), then click Run to run your pipeline.

Results

Your pipeline is being executed, the messages published to the Pub/Sub topic are being retrieved every 5 seconds and can be seen in the output logs. You can refresh the Metrics view in the Pipeline Details panel to see the number of records being incrementally updated.
Output log with 6 records produced for a 256 bytes metrics.

Did this page help you?

If you find any issues with this page or its content – a typo, a missing step, or a technical error – please let us know!