How to Build a Robust Dynamic Scheduled Task (Cron) Service in AWS Using SNS, Cloudwatch, Lambda, and DynamoDB?
We have been developing a SaaS-based CRM system using AWS serverless architecture using Gateway API, Lambda, Cloudwatch, S3, and DynamoDB for most of our system. A lot of things have been said related to the advantages of using a serverless architecture so I would focus on the requirements and solution for a dynamic scheduler task service.
What were the requirements?
There was a recent requirement to support a dynamic scheduling task service that can be configured based on consumer data. For example, whenever a lead is created in a system then the sales department should be able to create a reminder based on their discussion. There were additional requirements from the management to track the execution of those scheduled tasks and the system should execute a task in the background. Also, they want to support this for thousands of entries in the system.
Amazon CloudWatch Events service allows us to create an alarm that can be triggered based on the time but our challenge was that we need to set an alarm dynamically. We also want to set the dynamic data inside that scheduled job so we were looking to build a system that can:
- Creates a scheduling task based on the input
- Process the data associated with the task
How did we solve it?
Our system using DynamoDB as our persistent layer and we took the advantage of DynomoDB datastream to trigger an event based on insertion or deletion of the entry. We associate that event with a lambda function and that creates a cloudwatch event (Alarm with parameters ) for every entry in the database. Finally, an alarm triggers an SNS service, and that triggers a final lambda function to process the final business logic.
FYI, we tried to call a Lambda function from the cloudwatch event but it was not scalable and started throwing exceptions after 50 entries.
So we were able to achieve our requirements by:
- We can track scheduled tasks using cloudwatch logs
- We can use dynamic data and process it in the lambda function
How does it work?
This architecture is scalable and supports multiple scheduled tasks. The process flow works like this :
- Save an entry in the dynamodb table with a cron syntax(rate( 2 minutes)) i.e. if you want to create a recurring or single task then add an extra column for it.
- It triggers(dynamo stream) a lambda function that creates a cloudwatch rule for this entry.
- An alarm triggers on an SNS topic with data entry
- A final lambda function is triggered and it can utilize the data entry from dynamodb table to process its business logic
The advantage of using AWS is that you can find almost all the solutions in their services and we just need to configure and use them with a few tweaks.
Show me the code
We need to create a few functions in our lambda function to create cloudwatch rule for scheduled task:
function cloudWatchPutRule(ruleName, cronExpression) {
return new Promise((resolve, reject) => {
console.log("Adding a rule: " + ruleName + ": " + cronExpression);
const cloudWatchEventParams = {
Name: ruleName,
ScheduleExpression: cronExpression,
State: 'ENABLED'
}cloudWatchEvents.putRule(cloudWatchEventParams, function(err, data) {
if (err) {
console.log('Cloud Watch Put Rule Error', err)
reject(err)
} else {
console.log('Cloud Watch Put Rule Success', data.RuleArn)
resolve(data.RuleArn)
}
})
})
}
We also need to pass the parameters to the scheduled job so we need a function to add a target
function cloudWatchAddTargets(ruleName, data, topicArn, cronCloudWatchEventsTarget) {
return new Promise((resolve, reject) => {
console.log("Adding a target");
const targetParams = {
Rule: ruleName,
Targets: [{
Arn: topicArn,
Id: cronCloudWatchEventsTarget,
Input: JSON.stringify(data)
}]
};cloudWatchEvents.putTargets(targetParams, function(err, data) {
if (err) {
console.log("Add Target Error", err);
reject(err)
} else {
console.log("Add Target Success", data);
resolve(data)
}
});
})}
We need to call these functions from our lambda function
module.exports.watchCronJobs = async (event, context, callback) => {
return new Promise(async (resolve, reject) => {
event.Records.forEach(function(record) {
console.log("Processing:", record);
var topicArn = process.env.JOBS_TOPIC_ARN;
if (record.eventName == 'INSERT') {
var ruleName = "Rule_" + record.dynamodb.NewImage.leadSourceId.S;
var cronExpression = record.dynamodb.NewImage.cronExpression.S;
var cronCloudWatchEventsTarget = "CronCloudWatchEventsTarget_" + record.dynamodb.NewImage.leadSourceId.S
var data = record.dynamodb.NewImage;
return new Promise(async (resolve, reject) => {
const ruleArn = await cloudWatchPutRule(ruleName, cronExpression).catch(err => {
reject(err)
return
})await cloudWatchAddTargets(ruleName, data, topicArn, cronCloudWatchEventsTarget).catch(err => {
reject(err)
return
})
resolve(true)
});
}
});
});
};
Finally, we need to join all the pieces together and we can use a serverless framework for it.
functions:
syncDataSource:
handler: handler.syncDataSource
events:
- sns:
arn:
Ref: JobsTopic
topicName: JobsTopic
watchCronJobs:
handler: handler.watchCronJobs
events:
- stream:
type: dynamodb
arn:
'Fn::GetAtt':
- JobsTable
- StreamArn
resources:
Resources:
JobsTable:
Type: 'AWS::DynamoDB::Table'
Properties:
TableName: JobsTable
AttributeDefinitions:
- AttributeName: leadSourceId
AttributeType: S
KeySchema:
- AttributeName: leadSourceId
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
JobsTopic:
Type: 'AWS::SNS::Topic'
Properties:
TopicName: JobsTopic
SnsTopicPolicy:
Type: 'AWS::SNS::TopicPolicy'
Properties:
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: SnsTopicPolicy
Effect: Allow
Principal:
Service: events.amazonaws.com
Action: 'sns:Publish'
Resource: '*'
Topics:
- Ref: JobsTopic
Now deploy the solution on your AWS infrastructure :)
serverless deploy -v
Test the above solution from your command line
aws dynamodb put-item --table-name JobsTable --item '{"leadSourceId": {"S": "20"}, "cronExpression": {"S": "rate(2 minutes)"}}'
Source Code:
We would contribute this solution to others so that they can reuse or improve it. You can download or fork the code from git repository.
I hope the above solution helps you to solve your problem and I would really appreciate it if I could get a clap for this article.