Create a Java Lambda function
To create a Lambda function, we will be using the AWS toolkit with Eclipse.
Download and setup AWS Toolkit with Eclipse on your laptop using instructions at https://aws.amazon.com/en/eclipse/.
Once configured, use Eclipse to create a new Lambda Java project:
Procedure
-
Click File > New > Project.
-
In the New Project Wizard, click AWS > AWS Lambda Java Project.
- Click Next.
-
Set the values as below:
- Project Name = TalendLambdaProject
- Package Name = com.talend.lambda.handlers
- Class Name = S3LambdaFunctionHandler
- Output type = Object
-
Click Finish.
The README guide is displayed. Take some time to read it and get familiar with the steps to create, develop and deploy a Lambda function.
-
In Project Explorer view, navigate through TalendLambdaProject > src > com.talend.lambda.handlers.
-
Double click on S3LambdaFunctionHandler.java to open. As
you can see, a sample basic code has been generated for you.
-
Remove all generated content and update your Lambda class to use the code
below:
package com.talend.lambda.handlers; import java.io.IOException; import java.util.Base64; import java.util.List; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.S3Event; import com.amazonaws.services.s3.event.S3EventNotification.S3EventNotificationRecord; import com.fasterxml.jackson.databind.ObjectMapper; public class S3LambdaFunctionHandler implements RequestHandler<S3Event, Object> { @Override public Object handleRequest(S3Event pS3Event, Context context) { final String v_tac_endpoint = System.getenv("tac_endpoint"); final String v_tac_username = System.getenv("tac_username"); final String v_tac_password = System.getenv("tac_password"); final String v_tac_task_name = System.getenv("tac_task_name"); Integer vTaskId = null; String v_bucket = null; String v_object = null; CloseableHttpClient v_http_client = null; CloseableHttpResponse v_response = null; S3EventNotificationRecord v_record = null; // To keep things simple for this demo, // we only handle the first S3 event notification in the list List<S3EventNotificationRecord> v_liste_records = pS3Event.getRecords(); if (v_liste_records != null && v_liste_records.size() > 0) { v_record = v_liste_records.get(0); } if (v_record != null) { v_bucket = v_record.getS3().getBucket().getArn(); context.getLogger().log("bucket = " + v_bucket); v_object = v_record.getS3().getObject().getKey(); context.getLogger().log("file = " + v_object); } // if an object has been uploaded in S3, call TAC Metaservlet to trigger // Talend Job if (v_object != null) { v_http_client = HttpClients.createDefault(); // get the tac task id by name String v_taskid_request = "{\"actionName\":\"getTaskIdByName\",\"authPass\":\"" + v_tac_password + "\",\"authUser\":\"" + v_tac_username + "\",\"taskName\":\"" + v_tac_task_name + "\"}"; context.getLogger().log("Request being sent to Talend Administration Center : " + v_taskid_request); String v_taskid_request_base64 = Base64.getEncoder().encodeToString(v_taskid_request.getBytes()); HttpGet v_http_get_taskid_request = new HttpGet( v_tac_endpoint + "/metaServlet?" + v_taskid_request_base64); try { // send the HTTP request to remote TAC v_response = v_http_client.execute(v_http_get_taskid_request); // log response received from TAC String v_json = EntityUtils.toString(v_response.getEntity()); context.getLogger().log("Response from Talend Administration Center : " + v_json); // Extract response ObjectMapper vObjectMapper = new ObjectMapper(); TacResponse response = vObjectMapper.readValue(v_json, TacResponse.class); if(response != null){ vTaskId = response.taskId; } } catch (ClientProtocolException e_ClientProtocolException) { context.getLogger().log(e_ClientProtocolException.toString()); } catch (IOException e_IOException) { context.getLogger().log(e_IOException.toString()); } } //if the task id has been retrieved if (vTaskId != null) { String v_tac_run_task_request = "{\"actionName\":\"runTask\",\"authPass\":\"" + v_tac_password + "\",\"authUser\":\"" + v_tac_username + "\",\"taskId\":" + vTaskId + ",\"mode\":\"asynchronous\"" + ",\"context\":{\"s3_file\":\"" + v_object + "\"}" + "}"; context.getLogger().log("Request being sent to Talend Administration Center : " + v_tac_run_task_request); // encode the metaservlet request in base64 String v_request_base64 = Base64.getEncoder().encodeToString(v_tac_run_task_request.getBytes()); // construct an HTTP request HttpGet v_http_get_request = new HttpGet(v_tac_endpoint + "/metaServlet?" + v_request_base64); try { // send the HTTP request to remote TAC v_response = v_http_client.execute(v_http_get_request); // log response received from TAC context.getLogger() .log("Response from Talend Administration Center : " + EntityUtils.toString(v_response.getEntity())); } catch (ClientProtocolException e_ClientProtocolException) { context.getLogger().log(e_ClientProtocolException.toString()); } catch (IOException e_IOException) { context.getLogger().log(e_IOException.toString()); } } return null; } }
-
Save your changes.
The Lambda class will complain about an unknown TacResponse class. The class does not exist yet, so let's create it.
- Right-click on the package com.talend.lambda.handlers.
- Choose New > Class.
- New Java Class: set the values as below:
- Source folder = TalendLambdaProject/src
- Package = com.talend.lambda.handlers
- Name = TacResponse
- Modifiers = public
- Superclass = java.lang.Object
- All other fields: leave as default.
- Click Finish to create the new class.
- Open the class, remove the default content then copy/paste the code
below:
package com.talend.lambda.handlers; import com.fasterxml.jackson.annotation.JsonIgnoreType; public class TacResponse { @JsonIgnoreType public static class ExecutionTimeType{ public long millis; public String seconds; } public Integer returnCode; public Integer taskId; public ExecutionTimeType executionTime; }
- Save changes.
- Check any eventual error inside both classes S3LambdaFunctionHandler.java and TacResponse.java. You should no longer have any error in these classes.
- Right-click on the package com.talend.lambda.handlers.
Did this page help you?
If you find any issues with this page or its content – a typo, a missing step, or a technical error – please let us know!