Lambda Event Sources: S3, DynamoDB, SQS Triggers
Event sources connect AWS services to Lambda, triggering functions automatically when events occur. S3 bucket uploads, DynamoDB stream changes, and SQS queue messages can invoke Lambda functions, enabling you to build event-driven workflows without polling. Lambda supports over 100 event sources, from simple file uploads to complex application-load-balancer traffic.
What are Event Sources and Why Use Them?
An event source is any AWS service that produces events Lambda can consume. When an event occurs (e.g., file uploaded to S3), the service publishes it to Lambda, which creates an invocation with the event data. This is more efficient than polling because:
- Automatic scaling: Lambda scales automatically to handle event volume—thousands of S3 uploads trigger thousands of concurrent Lambda invocations.
- No infrastructure: No servers polling queues; the service pushes events directly.
- Cost efficiency: You pay only for invocations, not idle polling.
S3 Bucket Notifications
When files are uploaded, deleted, or restored in an S3 bucket, Lambda can automatically process them. Create a Lambda function to handle S3 events:
import json
import boto3
s3_client = boto3.client('s3')
def lambda_handler(event, context):
"""
Handle S3 bucket events: file uploads, deletes, etc.
event structure:
{
"Records": [
{
"s3": {
"bucket": {"name": "my-bucket"},
"object": {"key": "uploads/photo.jpg", "size": 5000}
},
"eventName": "ObjectCreated:Put"
}
]
}
"""
for record in event.get('Records', []):
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
size = record['s3']['object']['size']
event_type = record['eventName']
print(f'{event_type}: {key} ({size} bytes) in {bucket}')
# Process the file (e.g., resize image, transcribe audio)
if key.endswith('.jpg') or key.endswith('.png'):
resize_image(bucket, key)
elif key.endswith('.mp3'):
transcribe_audio(bucket, key)
return {'statusCode': 200, 'processed': len(event.get('Records', []))}
def resize_image(bucket, key):
# Placeholder for image resizing logic
print(f'Resizing {key}...')
pass
def transcribe_audio(bucket, key):
# Placeholder for transcription logic
print(f'Transcribing {key}...')
pass
Connect the S3 bucket to Lambda:
- In S3 Console, select your bucket
- Go to Properties → Event notifications → Create event notification
- Event name:
lambda-trigger - Event types: Choose
s3:ObjectCreated:*ands3:ObjectRemoved:* - Destination: Lambda Function
- Function: Select your Lambda function
- Click Save
Now every S3 upload automatically invokes your Lambda function.
DynamoDB Streams
DynamoDB Streams capture changes (inserts, updates, deletes) in a table and send them to Lambda. This enables real-time data synchronization, notifications, and cross-service workflows.
Enable streams on a DynamoDB table:
- In DynamoDB Console, select your table
- Go to Exports and streams → DynamoDB Streams → Enable
- Choose New and old images (captures both old and new values)
- Click Enable
Create a Lambda function to process stream records:
import json
def lambda_handler(event, context):
"""
Process DynamoDB Stream events.
event structure:
{
"Records": [
{
"dynamodb": {
"Keys": {"id": {"S": "123"}},
"NewImage": {"id": {"S": "123"}, "name": {"S": "Alice"}, "age": {"N": "30"}},
"OldImage": {"id": {"S": "123"}, "name": {"S": "Alice"}, "age": {"N": "29"}},
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventName": "MODIFY" # INSERT, MODIFY, or REMOVE
}
]
}
"""
for record in event.get('Records', []):
event_name = record['eventName']
dynamodb = record['dynamodb']
if event_name == 'INSERT':
new_item = deserialize_dynamodb(dynamodb.get('NewImage', {}))
print(f'New item inserted: {new_item}')
notify_subscribers(f'User {new_item.get("name")} joined')
elif event_name == 'MODIFY':
old_item = deserialize_dynamodb(dynamodb.get('OldImage', {}))
new_item = deserialize_dynamodb(dynamodb.get('NewImage', {}))
# Detect changes
if old_item.get('status') != new_item.get('status'):
print(f'Status changed: {old_item.get("status")} -> {new_item.get("status")}')
elif event_name == 'REMOVE':
old_item = deserialize_dynamodb(dynamodb.get('OldImage', {}))
print(f'Item deleted: {old_item}')
return {'statusCode': 200, 'processed': len(event.get('Records', []))}
def deserialize_dynamodb(item):
"""Convert DynamoDB format to Python dict"""
result = {}
for key, value in item.items():
if 'S' in value:
result[key] = value['S']
elif 'N' in value:
result[key] = float(value['N'])
elif 'BOOL' in value:
result[key] = value['BOOL']
return result
def notify_subscribers(message):
# Send SNS notification, write to SQS, etc.
print(f'Notification: {message}')
pass
Connect the stream to Lambda:
- In Lambda Console, click Add trigger
- Source: DynamoDB
- Table: Select your DynamoDB table
- Stream: DynamoDB Streams
- Batch size: 100 (number of records per invocation)
- Click Add
Now every DynamoDB change triggers your Lambda function.
SQS Queues
SQS queues decouple producers from consumers. Lambda can poll an SQS queue and process messages in batches, enabling reliable asynchronous processing.
Create an SQS queue:
- In SQS Console, click Create Queue
- Name:
my-queue - Type: Standard (high throughput, best-effort ordering)
- Click Create Queue
Create a Lambda function to process queue messages:
import json
import boto3
def lambda_handler(event, context):
"""
Process SQS queue messages.
event structure:
{
"Records": [
{
"messageId": "abc123",
"body": "{\"user\": \"alice\", \"action\": \"login\"}"
}
]
}
"""
successful = []
failed = []
for record in event.get('Records', []):
try:
message_id = record['messageId']
body = record['body']
# Parse JSON body
message = json.loads(body)
user = message.get('user')
action = message.get('action')
print(f'Processing: User {user} performed {action}')
# Process the message (write to database, call external API, etc.)
process_message(message)
successful.append(message_id)
except Exception as e:
failed.append({'messageId': record['messageId'], 'error': str(e)})
print(f'Error processing message: {str(e)}')
# Return results; Lambda automatically deletes successfully processed messages
return {
'statusCode': 200,
'successful': len(successful),
'failed': len(failed),
'failedMessages': failed
}
def process_message(message):
# Your business logic here
print(f'Processing: {message}')
pass
Connect the queue to Lambda:
- In Lambda Console, click Add trigger
- Source: SQS
- Queue: Select your SQS queue
- Batch size: 10 (messages per invocation; max 10 for standard queues)
- Click Add
Send a test message:
aws sqs send-message \
--queue-url https://sqs.us-east-1.amazonaws.com/123456789012/my-queue \
--message-body '{"user": "alice", "action": "login"}'
Lambda polls the queue every few seconds and invokes your function with a batch of messages.
Comparison Table
| Event Source | Latency | Use Case | Batch Size |
|---|---|---|---|
| S3 | Seconds | File processing, image resizing | 1 per object |
| DynamoDB Streams | <1 second | Real-time replication, notifications | Configurable (1–100) |
| SQS | Seconds | Asynchronous task queues, decoupling | Configurable (1–10) |
| SNS | Milliseconds | Publish-subscribe, fan-out | 1 per message |
| API Gateway | Milliseconds | Synchronous HTTP requests | N/A |
Choose based on latency and integration requirements.
Error Handling and Dead Letter Queues
For SQS, configure a Dead Letter Queue (DLQ) to capture failed messages:
- In SQS Console, create a second queue:
my-queue-dlq - Select your main queue
- Go to Edit → Dead-Letter Queue → Enable
- Choose
my-queue-dlqas the DLQ - Set Maximum Receives to 3
- Click Save
If Lambda fails to process a message 3 times, it's automatically sent to the DLQ for manual investigation.
Key Takeaways
- Event sources automatically trigger Lambda when events occur: S3 uploads, DynamoDB changes, SQS messages, etc.
- S3 events enable file processing pipelines; DynamoDB Streams enable real-time synchronization; SQS enables asynchronous task queues.
- Lambda processes events in the structure provided by the source service; always deserialize/parse appropriately.
- Batch processing (multiple records per invocation) reduces latency and improves cost efficiency.
- Configure batch size, window time, and parallelization to balance latency and cost.
- Use Dead Letter Queues for SQS to capture failed messages for debugging.
Frequently Asked Questions
Can Lambda trigger on specific S3 events (e.g., only .jpg uploads)?
Yes. In S3 Event Notifications, use Prefix and Suffix filters (e.g., prefix: uploads/, suffix: .jpg) to route only matching objects to Lambda.
How do I retry failed Lambda invocations from an event source?
For SQS, increase the Dead Letter Queue retry count or re-send messages. For DynamoDB Streams, enable Function response types → Report batch item failures and return failed record IDs; Lambda retries them. For S3, re-create the object.
What's the maximum batch size for each event source?
S3: 1 record per invocation (not batched). DynamoDB: 100 max (default 100). SQS: 10 max (standard queues). SNS: 1 record per message. SNS/SQS FIFO queues: 1 record (to preserve order).
Can I filter events before Lambda is invoked?
Yes. SQS has message filters. DynamoDB Streams can be filtered via Lambda Event Source Mapping → Filter criteria (new in 2025). For S3, use prefix/suffix filters. This reduces unnecessary Lambda invocations and saves cost.
How do I handle large payloads in event sources?
S3: Use S3 Select for filtering large files. SQS: Store large data in S3 and pass a reference in the SQS message. DynamoDB: Store large attributes in S3 and reference them. This reduces invocation payload size and improves efficiency.
Further Reading
- S3 Bucket Notifications — Official guide to S3 event configuration
- DynamoDB Streams — Stream architecture and consumption patterns
- SQS to Lambda — Event source mapping and configuration
- Event Source Mapping — Technical specification for all event sources