Kinesis Streams の基本操作

Kinesis Data Streams

やること

Kinesis Streams の基本操作を公式ドキュメントのチュートリアルを使って確認します。

Use the AWS CLI to perform Amazon Kinesis Data Streams operations - Amazon Kinesis Data Streams
Topics for customers new to using Amazon Kinesis Data Streams.

ストリームの作成

まずは、ストリームの作成です。今回は create-stream コマンドを利用してストリームを作成します。

create-stream — AWS CLI 2.17.55 Command Reference

create-stream コマンドのオプションを確認すると --stream-name のみが必須項目のようです。今回は testStream という名前のストリームを作成します。

aws kinesis create-stream --stream-name testStream

コマンドのドキュメントにも記載されていますが、create-stream コマンドは非同期操作なため、コマンドの実行結果としてストリームの作成状況を取得できません。ストリームの状況を確認するため、describe-stream-summary コマンドを実行して、ストリームが “ACTIVE” になっていることを確認します。

aws kinesis describe-stream-summary --stream-name testStream
{
    "StreamDescriptionSummary": {
        "StreamName": "testStream",
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:************:stream/testStream",
        "StreamStatus": "ACTIVE",
        "StreamModeDetails": {
            "StreamMode": "ON_DEMAND"
        },
        "RetentionPeriodHours": 24,
        "StreamCreationTimestamp": "2024-03-08T01:32:17+00:00",
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "OpenShardCount": 4,
        "ConsumerCount": 0
    }
}

create-stream コマンドでは、--stream-mode-details オプションでキャパシティモードを指定可能です。--stream-mode-details オプションを指定せずにストリームを作成した場合、デフォルトのキャパシティモードはオンデマンドモードになるようです。

以下で言及されているようにオンデマンドモードの作成時のデフォルトスループットは、書き込み 4 MB/s、読み取り 8 MB/s です。describe-stream-summary コマンドの結果より、デフォルトで作成されるシャードは 4 つであることがわかります。Kinesis Streams のシャード単体でのスループットが、書き込み 1 MB/s、読み取り 2 MB/s なので、ストリーム全体では、ちょうど 4 シャード分のスループットを持っていることがわかります。

Data stream throughput

By default, new data streams created with the on-demand capacity mode have 4 MB/s of write and 8 MB/s of read throughput.

https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html

作成された 4 シャードについて、各シャードに対応するハッシュキーの範囲を確認します。

aws kinesis describe-stream --stream-name testStream | jq -r '.StreamDescription.Shards[]'
{
  "ShardId": "shardId-000000000000",
  "HashKeyRange": {
    "StartingHashKey": "0",
    "EndingHashKey": "85070591730234615865843651857942052863"
  },
  "SequenceNumberRange": {
    "StartingSequenceNumber": "49649956290020486760759299466593291715655677405781032962"
  }
}
{
  "ShardId": "shardId-000000000001",
  "HashKeyRange": {
    "StartingHashKey": "85070591730234615865843651857942052864",
    "EndingHashKey": "170141183460469231731687303715884105727"
  },
  "SequenceNumberRange": {
    "StartingSequenceNumber": "49649956290042787505957830089734827433928325767287013394"
  }
}
{
  "ShardId": "shardId-000000000002",
  "HashKeyRange": {
    "StartingHashKey": "170141183460469231731687303715884105728",
    "EndingHashKey": "255211775190703847597530955573826158591"
  },
  "SequenceNumberRange": {
    "StartingSequenceNumber": "49649956290065088251156360712876363152200974128792993826"
  }
}
{
  "ShardId": "shardId-000000000003",
  "HashKeyRange": {
    "StartingHashKey": "255211775190703847597530955573826158592",
    "EndingHashKey": "340282366920938463463374607431768211455"
  },
  "SequenceNumberRange": {
    "StartingSequenceNumber": "49649956290087388996354891336017898870473622490298974258"
  }
}

ここまでで以下のようなストリームを作成できました。

レコードの書き込み

続いて、作成したストリームにレコードを書き込んでいきます。

Kinesis Streams にデータを書き込む方法は、PutRecordPutRecords のいずれかです。PutRecord は単一のレコードを書き込むのに対し、PutRecords は複数のレコードを一括で書き込むことができます。

https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html

今回は、PutRecord を使用します。PutRecord では、対象ストリームの情報とデータ、パーティションキーの指定が必須となっています。

aws kinesis put-record --stream-name testStream --partition-key 123 --data testdata
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49649956290020486760759299466623514861146159752100708354"
}

今回、パーティションキーに “123” を指定し、shardId-000000000000 にデータが書き込まれたようです。パーティションキー “123” のハッシュキーを計算してみると、確かに shardId-000000000000 に対応するハッシュキーの範囲に含まれていることを確認できます。

>>> import hashlib
>>>
>>> # ハッシュ関数を利用してハッシュキーを計算
>>> md5hash=hashlib.md5(b'123').hexdigest()
>>> print(md5hash)
202cb962ac59075b964b07152d234b70
>>>
>>> # 計算したハッシュキーを 10 進数に変換
>>> print(int(md5hash,16))
42767516990368493138776584305024125808
>>>
>>> # 範囲を確認
>>> print(0 <= 42767516990368493138776584305024125808 <= 85070591730234615865843651857942052863)
True

書き込み先のシャードはパーティションキーをもとに決定されるため、パーティションキーの値が偏ると、特定のシャードへ負荷が偏ってしまいます。ストリームに含まれるシャードがいくら多かろうと、シャード単位でスループットの上限は決まっているため、負荷が偏った結果としてスロットリングする可能性があります。

レコードの取得

最後に、先ほど書き込んだレコードを確認します。

Kinesis Streams からデータを読み取る方法は、GetRecordsSubscribeToShard のいずれかです。SubscribeToShard は拡張ファンアウトで使用されます。

https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html

GetRecords を使って Kinesis Streams からレコードを読み取るには、2 段階の手順を踏む必要があります。

シャードイテレータの取得

レコードを取得する前にシャードイテレータを取得します。シャードイテレータは、GetRecords で読み取る範囲を示すものと考えると理解しやすいかと思います。

シャードは、データを格納するベルトコンベアのようなものです。シーケンス番号順に格納されたデータが並んでいる中で、シャードイテレータはどこからどこまでの範囲を取得するかを表します。この範囲は、シャードイテレータタイプによって変わります。

実際にシャードイテレータを取得してみます。

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name testStream
{
    "ShardIterator": "AAAAAAAAAAF9/JF/WiKq4zDr5z++fEhO8WlfFjZ993Cc+T+DsZSTqH+wL+9wK0lQf+er0DFD3YSmcUzlE8A6596lY1BVNNjVEYozq9GZXw2UeVkyTcRkfOKwoLJkAS8eXgwR6hDZG75l5lEUJAH86vZAN52E0xFbbFVmtHT4jdAxM5o2iiosCz6Ck7P/mGqKdIKnQyY15zbQbOo8uUBeoiOaF+dc3kZxtzpmr67QTzeHfHVw3HThyA=="
}

TRIM_HORIZON タイプのシャードイテレータを取得できました。一度取得したシャードイテレータの有効期限は 300 秒なので、有効期限が切れる前にレコードを取得します。

レコードの取得

取得したシャードイテレータを利用してレコードを取得します。

aws kinesis get-records --shard-iterator AAAAAAAAAAF9/JF/WiKq4zDr5z++fEhO8WlfFjZ993Cc+T+DsZSTqH+wL+9wK0lQf+er0DFD3YSmcUzlE8A6596lY1BVNNjVEYozq9GZXw2UeVkyTcRkfOKwoLJkAS8eXgwR6hDZG75l5lEUJAH86vZAN52E0xFbbFVmtHT4jdAxM5o2iiosCz6Ck7P/mGqKdIKnQyY15zbQbOo8uUBeoiOaF+dc3kZxtzpmr67QTzeHfHVw3HThyA==
{
    "Records": [
        {
            "SequenceNumber": "49649956290020486760759299466623514861146159752100708354",
            "ApproximateArrivalTimestamp": "2024-03-08T02:00:32.879000+00:00",
            "Data": "testdata",
            "PartitionKey": "123"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAE8TYjsy8GgjqrBAhbuUOQVGVqh5ypVW/YefdYRf+cFeu4ddJX3cAY4VoXjQGRO3Wt8w8BKy7s5Cje5imTkMiciBwq118mMu4+Lr9nN9C4CafkT87y7+DkKRaqe+BrAAhTCG2w+4dBOthfqKtHFT6AZbdxaGFl3bjrIkqWUn/2DijucbsaRTyAG7/mVZMRdvUjCLeqlgelYhwMnHB7Nj2yy7du/y+I4nFAy6VGts8Ywrg==",
    "MillisBehindLatest": 0
}

レコードを取得することができました。

1 回の GetRecords で必ずすべてのレコードを取得できるかはわかりません。シャードイテレータはレコードを取得する範囲を表すので、その範囲に含まれないレコードは取得されません。もし 1 回の GetRecords で必要なレコードに到達できない場合には、GetRecords の返り値に含まれる NextShardIterator を使用して、次の範囲のレコードを取得します。

シャードイテレータタイプ
概要 Kinesis Streams からレコードを取得する際、レコード取得の前にシャードイテレータを取得します。シャードイテレータは GetRecords で読み取る範囲を示しますが、シャードイテレータタイプによってその範囲の開始地点が変...