Sat, Apr 22, 2023
Read in 3 minutes
Overview about step function , step function elements and how to pass multiple input to the step function map with a scenario. How to read all objects in a s3 bucket using lambda in python.
Step function is a orchestration service which helps connecting various AWS services like lambda, SQS, DynamoDB etc .
States are elements in your state machine. Various functions performed by state ?
For more details each function ref AWS doc - https://docs.aws.amazon.com/step-functions/latest/dg/concepts-states.html
Reads the objects from S3 and each S3 object is a file which has a data and each row in that file needs to be added into dynamodb
ReadS3ObjectsLambda - Get the List all objects and pass all the objects as list into step function
Stepfunction Map State - Each element in the array (each object key) will be passed to another lambda
ProcessFileAndUpdateDynamoDBLambda - Get key of the object and reads that object line by line and add each line into dynamodb
Import datetime
Import logging
From os import listdir
S3 = boto3.client('s3')
Def lambda_handler (event, context):
Keys = []
resp = s3.list_objects_V2(Bucket='testdatabucket')
# reads all the objects from the s3 bucket "testdatabucket" and print all the keys
For obj in resp['Contents']
Keys.append(obj['Key'])
Print("keys are")
Print(keys)
retrun {
'files_array' : keys ,
'bucket' : 'testdatabucket'
}
{
"Comment":"Step function to read s3 objects and update dynamoDB"
"StartAt":"ReadS3ObjectsLambda "
"States": (
"ReadS3ObjectsLambda ":
"Type": "Task"
"Resource": "ReadS3ObjectsLambda "
"Next": "Iterating S3 objects"
},
"Iterating S3 objects":{
"Type": "Map",
"ItemsPath": "'$. files_array",
"Parameters": {
"bucket.$": "$. bucket"
"current_file.$": $$.Map. Item. Value"
},
"MaxConcurrency": 1,
"Iterator": {
"StartAt":"Process file",
"States":
{
"Process file": {
"Type": "Task"
"Resource":"ProcessFileAndUpdateDynamoDBLambda "
"Next": "Wait"
"Retry": [
{
"ErrorEquals":
"Lambda. ServiceException"
"Lambda. AWSLambdaException"
"Lambda. SdkClientException"
],
"IntervalSeconds": 1,
"MaxAttempts": 2,
"BackoffRate":1
}
],
"Catch": [
{
"ErrorEquals": [
"States. ALL"
].
"ResultPath": null,
"Next": "process file Failed"
}
]
},
"wait":{
"Type":"Wait",
"Seconds":30,
"End" : true
},
"process file Failed" : {
"Type" : "Pass",
"End" : true
}
}
},
"ResultPath":null,
"End":true
}
}
}
import datetime
import json
import boto3
from botocore. exceptions import ClientError
S3 = boto3. client ('s3")
boto3_ddb_client = boto3.client (' dynamodb")
import time
batchStartTime = time.time()
batchCurrentSize=0
maxUpdatePerSec
batchStartTime = time.time()
#Lambda handler receives s3 current_file , parse the current_file and process each line to update DDB.
Def lambda_handler(event,bucket):
Global alreadyUpdatedRowsNumber
Count = 0
Result = s3.get_object(Bucket=event['bucket'],Key=event['current_file'])
For line in result['Body'].read.splitlines():
Each_line = line.decode('utf-8')
Print(each_line)
update_ddb_row(each_line.strip())
Count + =1
Return count
# udating dynamoDB
Def update__ddb_row(row_text):
Global boto3_ddb_client
Try:
Part_key ,range_key = get_keys(row_text)
boto3_ddb_client.update_item(
TableName = 'testTable' ,
Key = {
'product_partition_key' : {'S' : Part_key},
'product_range_key': {'S': range_key},
},
AttributeUpdates = {
'comment' : {'value':{'S': 'test comment updated by lambda'}},
},
ReturnValues="UDTATED"
### get the line in the file and get the partition key and range key
Def get_keys(raw_file_file)
Js = json.loads(raw_file_line.strip())
Try:
this_partition_key = js["paroduct_partition_key"]["s"]
this_range_key = js["product_range_key"]["s"]
Retrun this_partition_key , this_range_key