Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data not receiving from Kinesis #107

Open
success-m opened this issue Jan 28, 2022 · 8 comments
Open

Data not receiving from Kinesis #107

success-m opened this issue Jan 28, 2022 · 8 comments

Comments

@success-m
Copy link

Hi,

I am doing a POC with kinesis and am using this connector and am hoping to use this in production. When "TRIM_HORIZON" is used in a newly created stream, things work fine but when trying this the next day on the same stream, it does not work. The data sent to kinesis is not processed by spark. Is there any solution to this?

Thank you in advance.

@rae89
Copy link

rae89 commented Feb 2, 2022

I am having this same issue.

@success-m
Copy link
Author

@rae89 - I went with "LATEST". Seems to work fine now. But we need to make sure the checkpoint don't go away.

@BreakpointsCA
Copy link

I have the same issue. However, when I use "LATEST" it works well for new data but it discard data already present in the stream even if I delete the checkpoint and I change the application name.

@success-m
Copy link
Author

@BreakpointsCA - Yes this is the expected behavior for LATEST. However, the checkpoints are a life-saver. Even if the spark cluster fails, the data ingestion is resumed from the point of failure. If we choose to work with a new stream, we need to make sure that we turn on the cluster first and then only issue put requests to kinesis.

@itsmesrds
Copy link

@success-m Even I observed the same. do you have any design doc, on how the consumption work from kinesis ? by looking at the Implementation of Kinesis Connector section of this doc https://www.qubole.com/blog/kinesis-connector-for-structured-streaming, It looks like, we need to have a same number of executors as kinesis shards ? without that, I see hug lag in consuming the data from other shards ?

can you please confirm on the same ?

@roncemer
Copy link
Contributor

I'm seeing the same issue. Basically, if the Kinesis data stream goes idle (no new records added) for an undetermined amount of time, fluend (or whatever producer) can still put new records onto the stream, but the consumer never gets any more records.

I'm using version com.qubole.spark/spark-sql-kinesis_2.12/1.2.0_spark-3.0 under Amazon Elastic MapReduce with Spark 3.2.1. My consumer is a Spark job written in Python. My Kinesis data stream is set to auto-scale the number of shards up or down based on demand.

This is a pretty bad bug. This will be a show-stopper for the whole project if we can't get this fixed, unless there's an alternate consumer for Kinesis which can be used instead.

@roncemer
Copy link
Contributor

I sent an email to the maintainer of this repo, and didn't get a response. For now, I've forked the project here https://github.com/roncemer/kinesis-spark-connector and updated it to build for Spark 3.2.1. Under Spark 3.2.1, it appears to be working correctly. I allowed it to go overnight with no new records being posted to the Kinesis data stream. When I started posting records again, the records arrived in Spark, and were processed.

I issued pull request https://github.com/qubole/kinesis-sql/pull/113/files from my forked repo back to the original qubole/kinesis-sql repo. Be sure to read the full description under the Comments tab as well.

I could use the help of anyone who might be interested in this project. Apparently, qubole/kinesis-sql is abandoned for about two years, and the main guy who was maintaining it doesn't respond to emails. If anyone can get in touch with someone who has control of this repo, please ask them to make me a committer. Barring that, I will have to publish the jar file from my forked repo as a maven resource. In any case, if I end up maintaining this project, either the original or forked repo, I will need volunteers to help handle the builds each time a new version of Spark is released.

In the mean time, feel free to build your own jar from my forked repo and run it under Spark 3.2.1. Also, let me know if it works under 3.3.x, or if you run into any other problems.

Thanks!
Ron Cemer

@success-m
Copy link
Author

@roncemer - Just saw this now. Great to know that you are maintaining this now. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants