Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ProvisionedThroughputExceededException and new batches are triggered even if avoidEmptyBatches=true #97

Open
juliankeppel opened this issue Jan 8, 2021 · 6 comments

Comments

@juliankeppel
Copy link
Contributor

juliankeppel commented Jan 8, 2021

I noticed that new batches are triggered even if all shards are empty and avoidEmptyBatches is set to true. This leads to ProvisionedThroughputExceededExceptions (I don't understand why at the moment). I thought this issue would resolve the problem. But it persists even after upgrading the kinsis-sql library to the newest version.

As a workaround, we thought avoidEmptyBatches would mitigate this problem by avoiding to much API calls if no data is available in the Kinesis stream.

But the exceptions still come up and we can see in the logs that new batches are triggered, always for the same sequence number, even if there is no new data at all:

1/01/08 08:19:31 INFO KinesisSource: Purging Committed Entries. ThresholdBatchId = 43986
21/01/08 08:19:31 INFO KinesisSource: End Offset is {"metadata":{"streamName":"viewprogress","batchId":"44086"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49613635859865357079366475818029505419698299042659303426"}}
21/01/08 08:19:31 INFO KinesisSource: Processing 1 shards from Stream(ShardInfo(shardId-000000000000,AFTER_SEQUENCE_NUMBER,49613635859865357079366475818029505419698299042659303426))
21/01/08 08:19:31 INFO KinesisSource: GetBatch generating RDD of offset range: ShardInfo(shardId-000000000000,AFTER_SEQUENCE_NUMBER,49613635859865357079366475818029505419698299042659303426)

This is the code we use to create the stream:

watchlist_events_stream = (
    spark.readStream.format("kinesis")
    .option("streamName", KINESIS_STREAM_NAME)
    .option("endpointUrl", KINESIS_ENDPOINT_URL)
    .option(
        "awsstsrolearn",
        f"{ASSUME_ROLE_ARN_PREFIX}{ACCOUNT_ID_DICT[args.stage]}{ASSUME_ROLE_ARN_SUFFIX}",
    )
    .option("awsstssessionname", SESSION_NAME)
    .option("kinesis.client.avoidEmptyBatches", "true")
    .load()
)

Any idea what could go wrong here?

@juliankeppel
Copy link
Contributor Author

@itsvikramagr Sorry for pushing this, but we use this connector in production and need some advice on how to avoid those LimitExceeded errors on "empty" streams. After some retries, our jobs break and the only thing we can do at the moment is to restart them.

Do you have any idea why we get all those errors and how to fix it? And why avoidEmptyBatches doesn't have any effect?

@itsvikramagr
Copy link
Contributor

@juliankeppel - I would have to look into the complete logs to give a better suggestions.

When do you see "ProvisionedThroughputExceededExceptions"? Most likely kinesis is throttling us when we make getRecords call in the executors. does it happen when there are non empty streams? Are there multiple readers to the same kinesis streams?
Can you try setting these configs

  • kinesis.executor.addIdleTimeBetweenReads = true
  • kinesis.executor.idleTimeBetweenReadsInMs (default is 1s)

In your streaming job, what is your trigger interval?

avoidEmptyBatches has no effect only when there is some resharding or we have reached shardEnd in one of the shards in the prrevious microbatch. (https://github.com/qubole/kinesis-sql/blob/master/src/main/scala/org/apache/spark/sql/kinesis/KinesisSource.scala#L178) which might not be true in your case. Can you check if you are using the latest connector.

@juliankeppel
Copy link
Contributor Author

juliankeppel commented Jan 15, 2021

@itsvikramagr It happens only for empty streams. There is one more consumer but as it happens only for empty streams, this might not be a problem of too many read requests.

We have different jobs, consuming different streams, with different trigger intervalls between 5 seconds and 15 minutes. The problem is the same for all of them, when the source streams are empty.

There is no resharding going on, we have stable amount of shards all the time.

I think we have reached shardEnd in this case. But why doesn't avoidEmptyBatches have any effect then? In my understanding, it's the main reason why we would avoid empty batches: Because we are at shardEnd and are waiting for new records to arrive.

I don't understand why the connector is fireing so many read requests when it has reached shardEnd and there are no new records coming in currently. And why the LimitExceeded only appears in this situations and not when there are records in the stream (which is the normal case).

@itsvikramagr
Copy link
Contributor

itsvikramagr commented Jan 15, 2021

I think we have reached shardEnd in this case. But why doesn't avoidEmptyBatches have any effect then?

Due to an existing logic with ShardEnd filtering, even with 'avoidEmptyBatches' 1 empty micro-batch can be created. In that micro-batch, shards which have reached the end are filtered out for further processing. We can definitely fix the logic but it can be a breaking change from existing behavior (of filtering shardEnd). So I had avoided that.

I don't understand why the connector is fireing so many read requests when it has reached shardEnd and there are no new records coming in currently. And why the LimitExceeded only appears in this situations and not when there are records in the stream (which is the normal case).

We have to look into the logs to understand that. My guess is that we are making too many getRecords call too frequently. Logs will have more details.

@juliankeppel
Copy link
Contributor Author

I attached a log from one of our jobs. I realized that I misunderstood what ShardEnd means. It means that a shard is at end of life. That's not the case in our scenario. You can see in the logs that it's always reading from the same Shard, with the same sequence number over and over again.

But I restarted the job due to some experiments, and now I don't see any of the ReadLimitExceeded errors. I can't explain why, because I changed nothing in the code or configuration.

Anyway, the problem with avoidEmptyBatches seem to persist. Maybe you can see something from the logs I attached.

log.txt

@itsvikramagr
Copy link
Contributor

I can see empty batches are created. There is a check to see if there are new records before we start a new batch. The check is always returning true.

You can find following lines in your log.

21/01/18 08:18:19 DEBUG KinesisSource: Can create new batch = true

https://github.com/qubole/kinesis-sql/blob/master/src/main/scala/org/apache/spark/sql/kinesis/KinesisSource.scala#L131 seems to be reason for it. I think that even though there are no new records, getMillisBehindLatest is non-zero. We can verify it. You can comment the check on millisBehindLatest and rerun your streaming jobs.

ProvisionedThroughputExceededException might be the side-effect of creating empty batches. In the executors, we are making multiple getRecords call to read data. We can add delays before we make getRecords request by using following config. kinesis.executor.addIdleTimeBetweenReads = true

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants