Introduction
Here we would implement the last, but not the least part of our application - notifying customers that they should come to the place that they waited for: It has two lambda functions:
- Product - sends waitees that should be notified into the SQS.
- Consumer - processes SQS and sends notifications to the waitees.
Probably your first thoughts are that we could send notifications directly from the producer function itself.
The problem here is that notify
operation is heavy and it could take too much time to send the notification to all waitees (maximum lambda execution time per request is 15 minutes).
This approach is not scalable and it would be a bottleneck.
Moving process notification functionality to separate function would be much more scalable.
For these two lambda functions, I have chosen NodeJs runtime with typescript which is more comfortable for engineers who got used to static typed languages.
NodeJs runtime gives us better cold start comparing to java
and much smaller package to deploy.
Initial project was generated using serverless framework: serverless create --template aws-nodejs-typescript
.
The same as before we have common serverless.yaml
for both lambda functions:
service: WaitlistAppNotificationLambdas
provider:
name: aws
runtime: nodejs8.10
stage: dev
region: us-west-2
memorySize: 128
- Runtime is set to
nodejs8.10
as it the latest version supported by AWS. - Memory size is set to minimum 128 which is enough for us.
Custom section:
custom:
notifyAtIndexName: NotifyAtIndex
notificationQueueName: NotificationQueue
waiteesTableName: Waitees
waiteesTableArn: WaitlistAppCrudLambdas-${self:provider.stage}.WaiteesTable
Let’s define SQS for our notifications:
NotificationQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.notificationQueueName}
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt:
- DeadNotificationQueue
- Arn
maxReceiveCount: 3
DeadNotificationQueue:
Type: AWS::SQS::Queue
RedrivePolicy
is specified with defined DLQ and receiveCount set to 3.
First we define Waitee
domain model:
interface Waitee extends AttributeMap {
id: string
waitlistId: string
name: string
phoneNumber: string
}
- It extends
AttributeMap
and with that we are able to convert DynamoDb query output to waitees objects array.
After logic of the producer
lambda function:
const notifyAt = new Date();
notifyAt.setMinutes(notifyAt.getMinutes() + 15, 0, 0);
const queryWaiteesParams: QueryInput = {
TableName: waiteesTableName,
IndexName: notifyAtIndexName,
KeyConditionExpression: "notifyAt = :notifyAt",
ProjectionExpression: "id, waitlistId, #name, phoneNumber",
ExpressionAttributeNames: {
"#name": "name"
},
ExpressionAttributeValues: {
":notifyAt": notifyAt.getTime()
}
};
const waiteesQuery = await documentClient.query(queryWaiteesParams).promise();
const waiteesCount = waiteesQuery.Count || 0;
It gets all the waitees from the database who should be near the waiting place in 15 minutes.
Then these waitees are sent into the notification queue by batches:
const waitees = waiteesQuery.Items as Waitee[] || [];
let batchEntries = [];
for (let i = 0; i < waiteesCount; i++) {
const waitee = waitees[i];
const batchEntry: SendMessageBatchRequestEntry = {
Id: waitee.id,
MessageBody: JSON.stringify({
"id": waitee.id,
"waitlistId": waitee.waitlistId,
"name": waitee.name,
"phoneNumber": waitee.phoneNumber
})
};
batchEntries.push(batchEntry);
if (batchEntries.length == 10 || i == waiteesCount - 1) {
const batchRequest: SendMessageBatchRequest = {
QueueUrl: queueUrl,
Entries: batchEntries
};
const response = await sqs.sendMessageBatch(batchRequest).promise();
if (response.$response.httpResponse.statusCode != 200) {
console.log("Failed to send batch: ", response.$response.error);
} else {
console.log("Sent batch at ", notifyAt);
}
batchEntries = [];
}
}
Now let’s switch to consumer
lambda function with the next logic:
for (const record of event.Records) {
const message: WaiteeMessage = JSON.parse(record.body);
const updateWaiteeParams: UpdateItemInput = {
TableName: waiteesTableName,
Key: {
"id": message.id,
"waitlistId": message.waitlistId
},
UpdateExpression: "set notifiedAt = :notifiedAt",
ExpressionAttributeValues: {
":notifiedAt": new Date().getTime()
},
ReturnValues: "NONE"
};
const updateResult = await documentClient.update(updateWaiteeParams).promise()
if (updateResult.$response.httpResponse.statusCode != 200) {
console.log("Error occurred while updating waitee ", message.id);
}
console.log("Updated waitee ", message.id);
}
What it does is just processes messages from the notification queue and marks them as sent.
Now definition of both lambda functions which is the most interesting thing here:
functions:
NotificationProducerHandler:
handler: src/NotificationProducerHandler.producer
role: NotificationLambdaRole
events:
- schedule:
rate: rate(1 minute)
enabled: true
environment:
waiteesTableName: ${self:custom.waiteesTableName}
notifyAtIndexName: ${self:custom.notifyAtIndexName}
notificationQueueName: ${self:custom.notificationQueueName}
NotificationConsumerHandler:
handler: src/NotificationConsumerHandler.consumer
role: NotificationLambdaRole
events:
- sqs:
batchSize: 10
arn:
Fn::GetAtt:
- NotificationQueue
- Arn
environment:
waiteesTableName: ${self:custom.waiteesTableName}
- Producer function runs periodically every minute
rate(1 minute)
. - Consumer function is triggered by notification queue and processes batches with size 10 (maximum allowed).
Build & Deploy
To deploy our lambda functions we execute sls deploy --verbose
and see next output:
- Size of the .zip file was 1.75 Mb
- Two separated lambda function were deployed.
From the logs we see that producer
function is triggered approximately every minute.
06:51:34 2018-11-18T06:51:34.005Z 61f18166-eafe-11e8-8f6c-41b5dd9c4be5 Started process at 2018-11-18T06:51:33.969Z
06:52:33 2018-11-18T06:52:33.260Z 8620818a-eafe-11e8-aa97-b3b240d06d20 Started process at 2018-11-18T06:52:33.260Z
06:53:33 2018-11-18T06:53:33.184Z a9e881e8-eafe-11e8-8017-070a571e89fc Started process at 2018-11-18T06:53:33.184Z
06:54:32 2018-11-18T06:54:32.846Z cd905140-eafe-11e8-8f8f-f1027bf247f0 Started process at 2018-11-18T06:54:32.846Z
06:55:32 2018-11-18T06:55:32.586Z f131b742-eafe-11e8-9455-bdfb0a1eaa74 Started process at 2018-11-18T06:55:32.586Z
06:56:32 2018-11-18T06:56:32.146Z 14b24f4c-eaff-11e8-b00d-a9ecf69c78b9 Started process at 2018-11-18T06:56:32.146Z
When we found waitees in the database we sent them to SQS.
06:54:32 START RequestId: cd905140-eafe-11e8-8f8f-f1027bf247f0 Version: $LATEST
06:54:32 2018-11-18T06:54:32.846Z cd905140-eafe-11e8-8f8f-f1027bf247f0 Started process at 2018-11-18T06:54:32.846Z
06:54:32 2018-11-18T06:54:32.927Z cd905140-eafe-11e8-8f8f-f1027bf247f0 Received 1 waitees from db at 2018-11-18T07:09:00.000Z
06:54:33 2018-11-18T06:54:33.086Z cd905140-eafe-11e8-8f8f-f1027bf247f0 Sent batch at 2018-11-18T07:09:00.000Z
06:54:33 2018-11-18T06:54:33.086Z cd905140-eafe-11e8-8f8f-f1027bf247f0 Finished process at 2018-11-18T06:54:33.086Z
06:54:33 END RequestId: cd905140-eafe-11e8-8f8f-f1027bf247f0
06:54:33 REPORT RequestId: cd905140-eafe-11e8-8f8f-f1027bf247f0 Duration: 391.48 ms Billed Duration: 400 ms Memory Size: 128 MB
Then we processed them almost instantly by consumer
function.
06:54:33 START RequestId: 025736f4-ec2e-59f9-a07a-629fe2f03760 Version: $LATEST
06:54:33 2018-11-18T06:54:33.726Z 025736f4-ec2e-59f9-a07a-629fe2f03760 Started process at 2018-11-18T06:54:33.726Z
06:54:34 2018-11-18T06:54:34.709Z 025736f4-ec2e-59f9-a07a-629fe2f03760 Updated waitee cccff429
06:54:34 2018-11-18T06:54:34.709Z 025736f4-ec2e-59f9-a07a-629fe2f03760 Finished process at 2018-11-18T06:54:34.709Z
06:54:34 END RequestId: 025736f4-ec2e-59f9-a07a-629fe2f03760
06:54:34 REPORT RequestId: 025736f4-ec2e-59f9-a07a-629fe2f03760 Duration: 1218.38 ms Billed Duration: 1300 ms Memory Size: 128 MB
Conclusion
Using typescript
was easy and fun for me. I got the knowledge at least to build similar lambda function in future.
There are always a few possible solutions and you should always analyze your needs before making any decisions.
Everything could be found on my github repository.