やること
Kinesis Streams の基本操作を公式ドキュメントのチュートリアルを使って確認します。
ストリームの作成
まずは、ストリームの作成です。今回は create-stream
コマンドを利用してストリームを作成します。
create-stream
コマンドのオプションを確認すると --stream-name
のみが必須項目のようです。今回は testStream という名前のストリームを作成します。
aws kinesis create-stream --stream-name testStream
コマンドのドキュメントにも記載されていますが、create-stream
コマンドは非同期操作なため、コマンドの実行結果としてストリームの作成状況を取得できません。ストリームの状況を確認するため、describe-stream-summary
コマンドを実行して、ストリームが “ACTIVE” になっていることを確認します。
aws kinesis describe-stream-summary --stream-name testStream
{
"StreamDescriptionSummary": {
"StreamName": "testStream",
"StreamARN": "arn:aws:kinesis:ap-northeast-1:************:stream/testStream",
"StreamStatus": "ACTIVE",
"StreamModeDetails": {
"StreamMode": "ON_DEMAND"
},
"RetentionPeriodHours": 24,
"StreamCreationTimestamp": "2024-03-08T01:32:17+00:00",
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"OpenShardCount": 4,
"ConsumerCount": 0
}
}
create-stream
コマンドでは、--stream-mode-details
オプションでキャパシティモードを指定可能です。--stream-mode-details
オプションを指定せずにストリームを作成した場合、デフォルトのキャパシティモードはオンデマンドモードになるようです。
以下で言及されているようにオンデマンドモードの作成時のデフォルトスループットは、書き込み 4 MB/s、読み取り 8 MB/s です。describe-stream-summary
コマンドの結果より、デフォルトで作成されるシャードは 4 つであることがわかります。Kinesis Streams のシャード単体でのスループットが、書き込み 1 MB/s、読み取り 2 MB/s なので、ストリーム全体では、ちょうど 4 シャード分のスループットを持っていることがわかります。
Data stream throughput
By default, new data streams created with the on-demand capacity mode have 4 MB/s of write and 8 MB/s of read throughput.
https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
作成された 4 シャードについて、各シャードに対応するハッシュキーの範囲を確認します。
aws kinesis describe-stream --stream-name testStream | jq -r '.StreamDescription.Shards[]'
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "85070591730234615865843651857942052863"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49649956290020486760759299466593291715655677405781032962"
}
}
{
"ShardId": "shardId-000000000001",
"HashKeyRange": {
"StartingHashKey": "85070591730234615865843651857942052864",
"EndingHashKey": "170141183460469231731687303715884105727"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49649956290042787505957830089734827433928325767287013394"
}
}
{
"ShardId": "shardId-000000000002",
"HashKeyRange": {
"StartingHashKey": "170141183460469231731687303715884105728",
"EndingHashKey": "255211775190703847597530955573826158591"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49649956290065088251156360712876363152200974128792993826"
}
}
{
"ShardId": "shardId-000000000003",
"HashKeyRange": {
"StartingHashKey": "255211775190703847597530955573826158592",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49649956290087388996354891336017898870473622490298974258"
}
}
ここまでで以下のようなストリームを作成できました。
レコードの書き込み
続いて、作成したストリームにレコードを書き込んでいきます。
Kinesis Streams にデータを書き込む方法は、PutRecord
か PutRecords
のいずれかです。PutRecord
は単一のレコードを書き込むのに対し、PutRecords
は複数のレコードを一括で書き込むことができます。
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
今回は、PutRecord
を使用します。PutRecord
では、対象ストリームの情報とデータ、パーティションキーの指定が必須となっています。
aws kinesis put-record --stream-name testStream --partition-key 123 --data testdata
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49649956290020486760759299466623514861146159752100708354"
}
今回、パーティションキーに “123” を指定し、shardId-000000000000 にデータが書き込まれたようです。パーティションキー “123” のハッシュキーを計算してみると、確かに shardId-000000000000 に対応するハッシュキーの範囲に含まれていることを確認できます。
>>> import hashlib
>>>
>>> # ハッシュ関数を利用してハッシュキーを計算
>>> md5hash=hashlib.md5(b'123').hexdigest()
>>> print(md5hash)
202cb962ac59075b964b07152d234b70
>>>
>>> # 計算したハッシュキーを 10 進数に変換
>>> print(int(md5hash,16))
42767516990368493138776584305024125808
>>>
>>> # 範囲を確認
>>> print(0 <= 42767516990368493138776584305024125808 <= 85070591730234615865843651857942052863)
True
書き込み先のシャードはパーティションキーをもとに決定されるため、パーティションキーの値が偏ると、特定のシャードへ負荷が偏ってしまいます。ストリームに含まれるシャードがいくら多かろうと、シャード単位でスループットの上限は決まっているため、負荷が偏った結果としてスロットリングする可能性があります。
レコードの取得
最後に、先ほど書き込んだレコードを確認します。
Kinesis Streams からデータを読み取る方法は、GetRecords
か SubscribeToShard
のいずれかです。SubscribeToShard
は拡張ファンアウトで使用されます。
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html
GetRecords
を使って Kinesis Streams からレコードを読み取るには、2 段階の手順を踏む必要があります。
シャードイテレータの取得
レコードを取得する前にシャードイテレータを取得します。シャードイテレータは、GetRecords
で読み取る範囲を示すものと考えると理解しやすいかと思います。
シャードは、データを格納するベルトコンベアのようなものです。シーケンス番号順に格納されたデータが並んでいる中で、シャードイテレータはどこからどこまでの範囲を取得するかを表します。この範囲は、シャードイテレータタイプによって変わります。
実際にシャードイテレータを取得してみます。
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name testStream
{
"ShardIterator": "AAAAAAAAAAF9/JF/WiKq4zDr5z++fEhO8WlfFjZ993Cc+T+DsZSTqH+wL+9wK0lQf+er0DFD3YSmcUzlE8A6596lY1BVNNjVEYozq9GZXw2UeVkyTcRkfOKwoLJkAS8eXgwR6hDZG75l5lEUJAH86vZAN52E0xFbbFVmtHT4jdAxM5o2iiosCz6Ck7P/mGqKdIKnQyY15zbQbOo8uUBeoiOaF+dc3kZxtzpmr67QTzeHfHVw3HThyA=="
}
TRIM_HORIZON タイプのシャードイテレータを取得できました。一度取得したシャードイテレータの有効期限は 300 秒なので、有効期限が切れる前にレコードを取得します。
レコードの取得
取得したシャードイテレータを利用してレコードを取得します。
aws kinesis get-records --shard-iterator AAAAAAAAAAF9/JF/WiKq4zDr5z++fEhO8WlfFjZ993Cc+T+DsZSTqH+wL+9wK0lQf+er0DFD3YSmcUzlE8A6596lY1BVNNjVEYozq9GZXw2UeVkyTcRkfOKwoLJkAS8eXgwR6hDZG75l5lEUJAH86vZAN52E0xFbbFVmtHT4jdAxM5o2iiosCz6Ck7P/mGqKdIKnQyY15zbQbOo8uUBeoiOaF+dc3kZxtzpmr67QTzeHfHVw3HThyA==
{
"Records": [
{
"SequenceNumber": "49649956290020486760759299466623514861146159752100708354",
"ApproximateArrivalTimestamp": "2024-03-08T02:00:32.879000+00:00",
"Data": "testdata",
"PartitionKey": "123"
}
],
"NextShardIterator": "AAAAAAAAAAE8TYjsy8GgjqrBAhbuUOQVGVqh5ypVW/YefdYRf+cFeu4ddJX3cAY4VoXjQGRO3Wt8w8BKy7s5Cje5imTkMiciBwq118mMu4+Lr9nN9C4CafkT87y7+DkKRaqe+BrAAhTCG2w+4dBOthfqKtHFT6AZbdxaGFl3bjrIkqWUn/2DijucbsaRTyAG7/mVZMRdvUjCLeqlgelYhwMnHB7Nj2yy7du/y+I4nFAy6VGts8Ywrg==",
"MillisBehindLatest": 0
}
レコードを取得することができました。
1 回の GetRecords
で必ずすべてのレコードを取得できるかはわかりません。シャードイテレータはレコードを取得する範囲を表すので、その範囲に含まれないレコードは取得されません。もし 1 回の GetRecords
で必要なレコードに到達できない場合には、GetRecords
の返り値に含まれる NextShardIterator
を使用して、次の範囲のレコードを取得します。