Skip to main content Skip to complementary content

Publishing a message to an Apache Pulsar topic

This scenario aims at helping you set up and use connectors in a pipeline. You are advised to adapt it to your environment and use case.

Example of a pipeline created from the instructions below.

Procedure

  1. Click Connections > Add connection.
  2. In the panel that opens, select the type of connection you want to create.

    Example

    data generator
  3. Select your engine in the Engine list.
    Information noteNote:
    • It is recommended to use the Remote Engine Gen2 rather than the Cloud Engine for Design for advanced processing of data.
    • If no Remote Engine Gen2 has been created from Talend Management Console or if it exists but appears as unavailable which means it is not up and running, you will not be able to select a Connection type in the list nor to save the new connection.
    • The list of available connection types depends on the engine you have selected.
  4. Select the type of connection you want to create.
    Here, select Data generator.
  5. Click Add dataset and fill in the dataset properties as described in Data generator properties.
  6. In the Add a new dataset panel, name your dataset.

    Example

    customer generated data
  7. Fill in the properties to generate the test customer data of your choice. In this example:
    1. In the Rows field, type in 100 as you want to generate 100 test records.
    2. Click Add field, type in firstname in the Name field of the element, select First Name in the Type list and type in 0 in the Blank % field as you want to generate random first names with no empty fields.
    3. Click Add field, type in lastname in the Name field of the element, select Last Name in the Type list and type in 0 in the Blank % field as you want to generate random last names with no empty fields.
    4. Click Add field, type in age in the Name field of the element, select Age in the Type list, type in 18 in the Min field and 99 in the Max field and type in 0 in the Blank % field, as you want to generate ages between 18 and 99 with no empty fields.
    Head sample of the customers generated data with first name, last name, and age columns.
  8. Click Connections > Add connection.
  9. Select the type of connection you want to create.
    Here, select Apache Pulsar.
  10. Fill in the connection properties to safely access your Apache Pulsar broker as described in Apache Pulsar properties, check the connection and click Add dataset.
  11. In the Add a new dataset panel, name your dataset. In this example, the customer-age topic that is currently empty will be used to publish the data about processed customer information.
    Configuration of a new Pulsar dataset.
  12. Name your dataset, Customers on Pulsar for example.
  13. Click Validate to save your dataset.
  14. Click Add pipeline on the Pipelines page. Your new pipeline opens.
  15. Give the pipeline a meaningful name.

    Example

    From Data generator to Pulsar - publish msg to Pulsar
  16. Click ADD SOURCE and select your source dataset, customer generated data in the panel that opens.
  17. Click add processor and add a Type converter processor to the pipeline in order to change the data type of the age field and be able to perform calculations on the field values. The configuration panel opens.
  18. Give a meaningful name to the processor.

    Example

    convert age data type
  19. In the Converters area:
    1. Select .age in the Field path list as you want to change the data type of the values of these specific records.
    2. Select Double in the Output type list as you want to change to the data type from Integer to Double.
  20. Click Save to save your configuration.
  21. (Optional) Look at the preview of the processor to see the data after the type conversion.
    In the Output data preview, the age data type is now double.
  22. Click add processor and add an Aggregate processor to the pipeline in order to calculate the average age of customers. The configuration panel opens.
  23. Give a meaningful name to the processor.

    Example

    calculate average age
  24. In the Operations area:
    1. Select .age in the Field path list as you want to calculate the average value of these specific records.
    2. Select Average in the Operation list.
    3. Enter avg_age in the Output field name field as you want to rename the new generated field.
  25. Click Save to save your configuration.
  26. (Optional) Look at the preview of the processor to see the data after the aggregation operation.
    In the Output data preview, the record of the average age appears in double type.
  27. Click the ADD DESTINATION item on the pipeline to open the panel allowing to select the Apache Pulsar topic in which your output data will be loaded, Customers on Pulsar.
  28. In the Configuration tab of the destination, check the Producer name and select the topic in which the data will be loaded.
    Configuration tab of the destination item.
  29. On the top toolbar of Talend Cloud Pipeline Designer, click the Run button to open the panel allowing you to select your run profile.
  30. Select your run profile in the list (for more information, see Run profiles), then click Run to run your pipeline.

Results

Your pipeline is being executed, the average age data from your local data has been processed and the output flow is sent to the Apache Pulsar topic you have defined.

What to do next

Once the event is published, you can consume the Pulsar message in another pipeline and use it as a source dataset:

A new pipeline where the source is the customer age data from the previous destination pipeline.

Did this page help you?

If you find any issues with this page or its content – a typo, a missing step, or a technical error – please let us know!