Lambdaでイベントソースマッピング(トリガー)を使用してSQSのメッセージを受信する
全体図
3回に分けてSQSを使用した疎結合なシステムを構築していきます。
今回はその2回目となります。
前回は下記を参照ください。
今回のゴール
第2回目となる今回は、LambdaでSQSのメッセージを取得してDynamoDBに書き込みの処理(冪等性の処理)を行います。 (上記図中の赤枠の部分)
SQSとLambdaとの間にイベントソースマッピング(トリガー)を作成する事によりLambdaサービスがSQSのキューをポーリングして、受信可能なメッセージがあればLambda関数が呼び出されるようになります。
LambdaはユーザーのVPCで実行されるのではなく、AWSのlambdaサービスのVPC上で実行されます。
Lambda関数とSQSキューが直接通信するわけではありませんのでご注意ください。
イベントソースのマッピングとは、イベントソースからを読み取り、Lambda 関数を呼び出す AWS Lambda リソースのことです。イベントソースマッピングを使用して、Lambda 関数を直接呼び出さないサービスのストリームまたはキューから項目を処理できます。Lambda は、次のサービスのイベントソースマッピングを提供します。
https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/invocation-eventsourcemapping.html
Lambda はキューをポーリングし、Lambda 関数を、キューメッセージを含むイベントと共に同期的に呼び出します。Lambda はメッセージをバッチで読み取り、バッチごとに、一度に関数を呼び出します。
https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-sqs.html
例えば、ユーザーのVPCエンドポイントを経由する事ができません。
メッセージの受信にエンドポイントを経由する必要がある時は、イベントソースマッピングを使用せずにLambdaをユーザーのVPCにアクセスできる設定をして、SQSポーリング機能を実装する必要があります。
また、イベントソースマッピングを使用すると、Lambda関数が正常終了すると、メッセージを自動で消去してくれます。
メッセージを再処理したい時は例外を発生させます。
例えば、バッチサイズ3としたときに、最大でLambda関数で3つのメッセージを取得する事ができるのですが、そのうち1つのメッセージのみを再実行させたいような事ができません。(正確にいうと推奨されません。)
ですので、Lambda関数は冪等性を実装するようにします。
DynamoDBの作成
以前の記事で使用したDynamoDBを使用しますので、下記の記事中のDynamoDBと、Lambdaにアタッチするポリシー ”Allow_DynamoDB_Idempotent_test_putitem”を作成します。
Lambdaの作成
メッセージを受信するLambda関数を作成します。
[関数名]:test-sqs-receive
[ランタイム]:Python 3.9
自動作成されたロールにポリシーをアタッチ
SQS関連のポリシーをアタッチします。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:ap-northeast-1:(your AWS Account ID):test-queue.fifo"
}
]
}
SQSをKMSのCMKで暗号化しておりますので、そのポリシーもアタッチしました。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:GenerateDataKey*"
],
"Resource": "(your KMS key ARN)"
}
]
}
冪等性の実装として以前の記事のDynamoDBを使用しましたので、その時作成した下記のポリシー(Allow_DynamoDB_Idempotent_test_putitem)をアタッチしました。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"dynamodb:PutItem",
"dynamodb:DeleteItem"
],
"Resource": "arn:aws:dynamodb:ap-northeast-1:(your account ID):table/Idempotent-test"
}
]
}
関数のソースコード
Lambda関数のソースコードは、”Lambdaの冪等性を実装する”で使用したソースを使用しました。
import boto3
import json
import botocore.exceptions
TBL_NAME = 'Idempotent-test'
dynamodb = boto3.client('dynamodb')
def lock(transaction_id: str):
try:
dynamodb.put_item(
TableName=TBL_NAME,
Item={"transaction_id": {"S": transaction_id}},
Expected={'transaction_id': {'Exists': False}}
)
return True
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
return False
else:
raise e
except Exception as e:
raise e
def lambda_handler(event, context):
try:
for record in event['Records']:
transaction_id = record['attributes']['MessageDeduplicationId']
print(transaction_id)
if(lock(transaction_id)):
try:
# メイン処理
print(record)
print('done!')
except Exception as e:
dynamodb.delete_item(
TableName=TBL_NAME,
Key={"transaction_id": {"S": transaction_id}}
)
raise e
else:
print('nothing to do')
except botocore.exceptions.ClientError as e:
print('critical error1')
print(e)
raise e
except Exception as e:
print('critical error2')
print(e)
raise e
トリガーの追加
Lambda関数の「トリガーの追加」ボタンよりトリガーを作成します。
SQSを選択して、SQSのキューを選択します。
確認
前回作成したsqsSend.phpでSQSにメッセージを送信して、メッセージがLambdaで処理され、メッセージが削除されたことを確認します。
次回
次回は、「SQSで正常に処理できないメッセージをデッドレターキューに移動してSNSで通知する」を構築していきます。