Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with Stream containing a lot of NO PUT ACTIVITY #100

Open
WTa-hash opened this issue Jan 29, 2021 · 3 comments
Open

Issue with Stream containing a lot of NO PUT ACTIVITY #100

WTa-hash opened this issue Jan 29, 2021 · 3 comments

Comments

@WTa-hash
Copy link

WTa-hash commented Jan 29, 2021

I have a Spark structured stream that is using Qubole Kinesis connector 1.2.0 on a Kinesis stream with 2 shards with 2 day retention period.

These are my Qubole Kinesis configs:

streamName: '.......'
endpointUrl: 'https://kinesis.us-east-1.amazonaws.com'
startingPosition: 'latest'
failondataloss: false
kinesis.executor.maxFetchTimeInMs: 60000
kinesis.executor.maxFetchRecordsPerShard: 1000000
kinesis.executor.maxRecordPerRead: 10000
kinesis.executor.addIdleTimeBetweenReads: true
kinesis.executor.idleTimeBetweenReadsInMs: 1000
kinesis.client.describeShardInterval: '1s'
kinesis.client.numRetries: 10
kinesis.client.retryIntervalMs: 3000
kinesis.client.maxRetryIntervalMs: 10000
kinesis.client.avoidEmptyBatches: false

Here's the issue I'm having... imagine a few records are pushed at time 00:00:00, then next set of records at 20:00:00. There's about a 20 hour gap between the 2 sets of records in the Kinesis stream. After this fix 5bd378b was introduced in 1.2.0, I ran into an issue where Spark is unable to fetch the later records from time 20:00:00 due to the long gap of no activity. It seems removing idle time between reads and increasing max fetch time helps in getting the later set of records. When I switch back to 1.1.4, then the provided Qubole Kinesis configs from above works, but I notice that it doesn't honor max fetch time and so Spark spends more time trying to get to the tip and may block other spark jobs until this gets done.

What's the recommended approach in this case? Using timestamp as offset may work better?

@WTa-hash
Copy link
Author

It seems the issue was mentioned here as well: #90 (comment)

@chadlagore
Copy link
Contributor

Yeah, the alternative is that maxReadTimeInMs is ignored and the application will continually call and get throttled against the Kinesis read API. I think the appropriate fix is to migrate to SubscribeToShard, which uses an event driven architecture to activate the streaming application. This is probably a large lift.

@roncemer
Copy link
Contributor

I sent an email to the maintainer of this repo, and didn't get a response. For now, I've forked the project here https://github.com/roncemer/kinesis-spark-connector and updated it to build for Spark 3.2.1. Under Spark 3.2.1, it appears to be working correctly. I allowed it to go overnight with no new records being posted to the Kinesis data stream. When I started posting records again, the records arrived in Spark, and were processed.

I issued pull request https://github.com/qubole/kinesis-sql/pull/113/files from my forked repo back to the original qubole/kinesis-sql repo. Be sure to read the full description under the Comments tab as well.

I could use the help of anyone who might be interested in this project. Apparently, qubole/kinesis-sql is abandoned for about two years, and the main guy who was maintaining it doesn't respond to emails. If anyone can get in touch with someone who has control of this repo, please ask them to make me a committer. Barring that, I will have to publish the jar file from my forked repo as a maven resource. In any case, if I end up maintaining this project, either the original or forked repo, I will need volunteers to help handle the builds each time a new version of Spark is released.

In the mean time, feel free to build your own jar from my forked repo and run it under Spark 3.2.1. Also, let me know if it works under 3.3.x, or if you run into any other problems.

Thanks!
Ron Cemer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants