2021.12.15
Protocol Buffersを使ってSQSのメッセージを構造化してみた
はじめに
こんにちは。RAKSUL Advent Calendar 2021 15日目を担当するハコベルの吉岡です。
本日はメッセージキューを使う際にProtocol Buffersを使って構造化されたメッセージのやりとりができないかのPoC(Proof of Concept)を紹介させていただきます。
背景
今取り組んでいるプロジェクト内でAmazon SQSを使うことになったのですが、現段階でSQSにメッセージをエンキューするのがGoのサービスで、メッセージを受け取るのがPythonのサービスということが想定されています。SQSの技術検証を進める中で「構造化されたデータをメッセージとしてやりとりできないか」とふと思い、「Protobufをうまいこと使えないか」という考えが浮かびました。
事例がないかリサーチしていると、自分がやりたいことと同じことをやっている方の記事があったのでこちらを参考に進めてみました。
今回はメッセージのプロデューサーをGo、コンシューマーをPythonで書いた場合の例を紹介したいと思います。
なお、ある程度Protocol Buffer, Message Queue, Docker, Go, Pythonの知識があることを前提に書いています。ご了承ください。
サンプルコード
いきなりですがサンプルコードはこちらです。
あくまでPoCなのでコードが少し雑なのはお許しください。こちらを見ながら読み進めていただけると理解しやすいと思います。それでは解説していきます。
.protoファイルを用意する
まずはproto
フォルダを作り、その中に.protoファイルを作成します。今回はPoC用のシンプルなメッセージを定義します。
message Message { string title = 1; repeated Sentence sentences = 2; } message Sentence { int64 id = 1; string text = 2; }
https://github.com/no7wataru/proto-sqs-poc/blob/main/proto/message.proto
コードを生成する
.protoファイルが用意できたら、続いてはGoとPythonのコードを生成します。protoc実行用のDockerイメージを作り、それを使ってコードを生成してみます。Dockerfileはこんな感じ。
FROM golang:1.17-buster ARG VERSION=3.19.1 RUN apt update -y && apt install -y curl unzip RUN curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v${VERSION}/protoc-${VERSION}-linux-x86_64.zip RUN unzip protoc-${VERSION}-linux-x86_64.zip -d /usr/local RUN rm protoc-${VERSION}-linux-x86_64.zip RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@latest RUN export PATH="$PATH:$(go env GOPATH)/bin" ENTRYPOINT ["protoc"]
https://github.com/no7wataru/proto-sqs-poc/blob/main/proto/Dockerfile
イメージをビルド。
$ docker build -f proto/Dockerfile -t protoc .
ビルドしたイメージを使って使ってコードを生成します。
$ docker run --rm -v (eval pwd)/proto:/proto protoc --proto_path=/proto --go_out=/proto --go_opt=paths=source_relative --python_out=/proto message.proto
実行するとprotoフォルダ内にmessage.pb.go
とmessage_pb2.py
というファイルが生成されているのが確認できます。
メッセージキューを立ち上げる
実際の環境ではAmazon SQSを使う予定ですが、今回はSQSと互換性のあるElasticMQを使ってローカルにメッセージキューのサービスを立ち上げて使っていきます。
mq
というフォルダを作成してその中にelasticmq.conf
というファイルを作ります。こんな感じに書いておくとproto-test-mq
というキューが起動時に作成されます。
queues { proto-test-mq { defaultVisibilityTimeout = 10 minutes delay = 0 seconds receiveMessageWait = 20 seconds fifo = false contentBasedDeduplication = false } }
https://github.com/no7wataru/proto-sqs-poc/blob/main/mq/elasticmq.conf
あとは以下のようにdocker run
を実行するとメッセージキューが立ち上がります。
$ docker run --rm -p 9324:9324 -p 9325:9325 -v (eval pwd)/mq/elasticmq.conf:/opt/elasticmq.conf softwaremill/elasticmq-native
ElasticMQは簡易的なUIも用意されるのでそちらでキューの情報を確認することもできます。9325のポートが簡易UIになります。
http://localhost:9325
プロデューサーを作成する
続いてメッセージを送信するプロデューサーをGoで書きます。出来上がったコードはこんな感じです。
package main import ( "encoding/base64" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" pb "github.com/no7wataru/proto-sqs-poc/proto" "google.golang.org/protobuf/proto" ) const ( endpoint = "http://localhost:9324" region = "ap-northeast-1" queueName = "proto-test-mq" ) func main() { sess, err := session.NewSession() if err != nil { panic(err) } client := sqs.New(sess, &aws.Config{ Endpoint: aws.String(endpoint), Region: aws.String(region), }) output, err := client.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String(queueName), }) if err != nil { panic(err) } println(*output.QueueUrl) m := &pb.Message{ Title: "This is the title of the message", Sentences: []*pb.Sentence{ { Id: 1, Text: "This is the first sentence.", }, { Id: 2, Text: "This is the second sentence.", }, }, } data, _ := proto.Marshal(m) message := base64.StdEncoding.EncodeToString(data) res, err := client.SendMessage(&sqs.SendMessageInput{ QueueUrl: output.QueueUrl, MessageBody: aws.String(message), }) if err != nil { panic(err) } println(*res.MessageId) }
https://github.com/no7wataru/proto-sqs-poc/blob/main/producer/main.go
実際にSQSを使う場合ははじめのsessionを作る部分を少し変更する必要があるはずです。ローカルのElasticMQで試す分にはこれで十分です。
メッセージを作成したらproto.Marshal()
でシリアライズ → base64で文字列に変換 → エンキュー という流れになります。実際に実行してみます。
$ go run producer/main.go http://localhost:9324/012345678901/proto-test-mq a765e3fd-1c39-48f5-b320-54e71a901900
ElasticMQのダッシュボードからもメッセージがうまくエンキューされているのが確認できます。
コンシューマーを作成する
続いてはメッセージを受信するコンシューマーのコードになります。こちらはPythonです。
import os import sys import base64 import boto3 sys.path.append(os.path.normpath(os.path.join( os.path.dirname(os.path.abspath(__file__)), '..'))) from proto.message_pb2 import Message endpoint = 'http://localhost:9324' region = 'ap-northeast-1' queue_name = 'proto-test-mq' access_key = 'x' secret_key = 'x' client = boto3.resource('sqs', endpoint_url=endpoint, region_name=region, aws_secret_access_key=access_key, aws_access_key_id=secret_key, use_ssl=False) queue = client.get_queue_by_name(QueueName=queue_name) messages = queue.receive_messages( AttributeNames=[ 'All' ], MaxNumberOfMessages=1, MessageAttributeNames=[ 'All' ], VisibilityTimeout=0, WaitTimeSeconds=0 ) for message in messages: data = base64.b64decode(message.body) msg = Message() msg.ParseFromString(data) print(msg.title) for sentence in msg.sentences: print(sentence.id) print(sentence.text) message.delete()
https://github.com/no7wataru/proto-sqs-poc/blob/main/consumer/consumer.py
今度はプロデューサーで行ったのと逆の処理をし、データをパースします。
コードを実行すると無事に結果が出力されます。
$ python consumer/consumer.py This is the title of the message 1 This is the first sentence. 2 This is the second sentence.
まとめ
いかがだったでしょうか?Protocol Buffersを使うことで構造化されたデータとしてメッセージを送受信することができました。メッセージの構造に変更があった際も.protoファイルを更新してコードを再生成するだけなので簡単ですね。
また、サンプルコード内では省略しましたが、SQSにメッセージを送信する際に一緒にメタデータを渡すことができます。.protoのバージョン等をメッセージと共に送信することでデータ構造に変更があった際も柔軟に対応できると思います。
Amazon SQSなどのメッセージキューを使う際には是非検討してみてください。
ハコベルチームでは一緒に働くメンバーを募集しています!
興味ある方はぜひこちらからご応募ください!
ラクスルのアドベントカレンダー全編はこちらから。
Protocol Buffersを使ってSQSのメッセージを構造化してみた
Featured posts
in #Technology