Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer (possibly) stuck if sequence # refers to expired data #125

Open
jtackaberry opened this issue Aug 4, 2020 · 1 comment
Open

Comments

@jtackaberry
Copy link
Contributor

I have the following scenario:

  1. Kinesis data stream with 24 hour retention
  2. A previous run of kinesis-consumer with a checkpointed sequence number
  3. kinesis-consumer stopped for > 24 hours such that the stored sequence number refers to expired data

Now I find that upon restarting the consumer, it continues to read 0 records with MillisBehindLatest remaining at 86400000 (24 hours). The shard iterator is changing after each GetRecords call, but each request yields 0 records and showing 86400000 ms behind.

I've let this run for about 15 minutes of continuously polling GetRecords without finding any actual records. I can't be sure it wouldn't eventually find data and finally advance the sequence number, but at least after ~15 minutes of that there wasn't any real sign of progress.

So I began investigating, and implemented the following bit of logic in the consumer which quickly (within about 5 seconds) gets things moving again:

  1. In Consumer.Scan() call c.client.DescribeStreamSummaryWithContext() to discover the retention period for the stream
  2. In Consumer.ScanShard() if we return 0 records and MillisBehindLatest is >= the retention period from Message buffer #1, then we can infer we have encountered this condition, and so we fetch a new shard iterator of type ShardIteratorTypeTrimHorizon.
  3. Add first checkpoint and interface #2 is only ever done once if the condition described exists, and is never done if records were ever observed from the shard, to more ensure we are better targeting this edge case.

Like the immediate rescan stuff from #122, fetching ShardIteratorTypeTrimHorizon also takes the fast path, skipping the scan ticker. But I've also implemented the "fast path" via a new ticker which is fixed at 200ms, based on the published data plan API limits which also addresses the throttling concern mentioned in #122.

I can clean up the code and submit a PR but wanted to float the idea by you. Obviously all this work I've been doing lately is really complicated your beautifully simple shard scanning loop, but I am finding that dealing with Kinesis edge cases in practice is a bit of a subtle science and exact art. :)

@jtackaberry
Copy link
Contributor Author

Another noteworthy thing about this is that it would require an additional permission on the stream for kinesis:DescribeStreamSummary (or kinesis:DescribeStream*), so given that it may need to be opt-in.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant