AWS SQSを使用した疎結合なシステムを構築②


Lambdaでイベントソースマッピング(トリガー)を使用してSQSのメッセージを受信する

全体図

3回に分けてSQSを使用した疎結合なシステムを構築していきます。

今回はその2回目となります。

前回は下記を参照ください。

今回のゴール

第2回目となる今回は、LambdaでSQSのメッセージを取得してDynamoDBに書き込みの処理(冪等性の処理)を行います。 (上記図中の赤枠の部分)

SQSとLambdaとの間にイベントソースマッピング(トリガー)を作成する事によりLambdaサービスがSQSのキューをポーリングして、受信可能なメッセージがあればLambda関数が呼び出されるようになります。

LambdaはユーザーのVPCで実行されるのではなく、AWSのlambdaサービスのVPC上で実行されます。

Lambda関数とSQSキューが直接通信するわけではありませんのでご注意ください。

イベントソースのマッピングとは、イベントソースからを読み取り、Lambda 関数を呼び出す AWS Lambda リソースのことです。イベントソースマッピングを使用して、Lambda 関数を直接呼び出さないサービスのストリームまたはキューから項目を処理できます。Lambda は、次のサービスのイベントソースマッピングを提供します。

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/invocation-eventsourcemapping.html

Lambda はキューをポーリングし、Lambda 関数を、キューメッセージを含むイベントと共に同期的に呼び出します。Lambda はメッセージをバッチで読み取り、バッチごとに、一度に関数を呼び出します。

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-sqs.html

例えば、ユーザーのVPCエンドポイントを経由する事ができません。

メッセージの受信にエンドポイントを経由する必要がある時は、イベントソースマッピングを使用せずにLambdaをユーザーのVPCにアクセスできる設定をして、SQSポーリング機能を実装する必要があります。

また、イベントソースマッピングを使用すると、Lambda関数が正常終了すると、メッセージを自動で消去してくれます。

メッセージを再処理したい時は例外を発生させます。

例えば、バッチサイズ3としたときに、最大でLambda関数で3つのメッセージを取得する事ができるのですが、そのうち1つのメッセージのみを再実行させたいような事ができません。(正確にいうと推奨されません。)

ですので、Lambda関数は冪等性を実装するようにします。

DynamoDBの作成

以前の記事で使用したDynamoDBを使用しますので、下記の記事中のDynamoDBと、Lambdaにアタッチするポリシー ”Allow_DynamoDB_Idempotent_test_putitem”を作成します。

Lambdaの作成

メッセージを受信するLambda関数を作成します。

[関数名]:test-sqs-receive
[ランタイム]:Python 3.9

自動作成されたロールにポリシーをアタッチ

SQS関連のポリシーをアタッチします。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage",
                "sqs:GetQueueAttributes"
            ],
            "Resource": "arn:aws:sqs:ap-northeast-1:(your AWS Account ID):test-queue.fifo"
        }
    ]
}

SQSをKMSのCMKで暗号化しておりますので、そのポリシーもアタッチしました。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:GenerateDataKey*"
            ],
            "Resource": "(your KMS key ARN)"
        }
    ]
}

冪等性の実装として以前の記事のDynamoDBを使用しましたので、その時作成した下記のポリシー(Allow_DynamoDB_Idempotent_test_putitem)をアタッチしました。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "dynamodb:PutItem",
                "dynamodb:DeleteItem"
            ],
            "Resource": "arn:aws:dynamodb:ap-northeast-1:(your account ID):table/Idempotent-test"
        }
    ]
}

関数のソースコード

Lambda関数のソースコードは、”Lambdaの冪等性を実装する”で使用したソースを使用しました。

import boto3
import json
import botocore.exceptions

TBL_NAME = 'Idempotent-test'

dynamodb = boto3.client('dynamodb')


def lock(transaction_id: str):
    try:
        dynamodb.put_item(
            TableName=TBL_NAME,
            Item={"transaction_id": {"S": transaction_id}},
            Expected={'transaction_id': {'Exists': False}}
        )
        return True
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
            return False
        else:
            raise e
    except Exception as e:
        raise e


def lambda_handler(event, context):
    try:
        for record in event['Records']:
            transaction_id = record['attributes']['MessageDeduplicationId']
            print(transaction_id)

            if(lock(transaction_id)):
                try:
                    # メイン処理
                    print(record)
                    print('done!')
                    
                except Exception as e:
                    dynamodb.delete_item(
                        TableName=TBL_NAME,
                        Key={"transaction_id": {"S": transaction_id}}
                    )
                    raise e
            else:
                print('nothing to do')
    except botocore.exceptions.ClientError as e:
        print('critical error1')
        print(e)
        raise e
    except Exception as e:
        print('critical error2')
        print(e)
        raise e

トリガーの追加

Lambda関数の「トリガーの追加」ボタンよりトリガーを作成します。

SQSを選択して、SQSのキューを選択します。

確認

前回作成したsqsSend.phpでSQSにメッセージを送信して、メッセージがLambdaで処理され、メッセージが削除されたことを確認します。

次回

次回は、「SQSで正常に処理できないメッセージをデッドレターキューに移動してSNSで通知する」を構築していきます。


コメントを残す

メールアドレスが公開されることはありません。