Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis Application State in Dynamo DB #22

Open
VikramBPurohit opened this issue Sep 12, 2018 · 1 comment
Open

Kinesis Application State in Dynamo DB #22

VikramBPurohit opened this issue Sep 12, 2018 · 1 comment
Labels
enhancement New feature or request good first issue Good for newcomers

Comments

@VikramBPurohit
Copy link

VikramBPurohit commented Sep 12, 2018

Hi @itsvikramagr,

Thanks again for writing this library and providing constant support to community.

Our spark structure streaming code doesn't need to be stateful. We are ok with cluster or executors going down as long as Kinesis is managing consumer's application state.

https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-ddb.html

Could you shed some light on -

  1. Why Kinesis isn't able to manage offset/application state in dynamo db although kinesis sql is calling describe stream API?
  2. I don't see any place in kinesis-sql code which is setting application name for Kinesis.

Thanks in advance for your response!

@itsvikramagr
Copy link
Contributor

itsvikramagr commented Sep 14, 2018

Kinesis SQL library does not use KCL for most of its working. KCL and structured streaming APIs does not go along so well. Also, KCL comes with an extra cost of DynamoDB which can be avoided for structured streaming

But at the same time, while designing kinesis-sql, we knew that users would want more reliable shard progress committer. Hence the module provides an option of pluggable committer. We can do the following to use another committer

private def metadataCommitter: MetadataCommitter[ShardInfo] = {
    metaDataCommitterType.toLowerCase(Locale.ROOT) match {
      case "hdfs" =>
        new HDFSMetadataCommitter[ ShardInfo ](metaDataCommitterPath, hadoopConf(sqlContext))
      case _ => throw new IllegalArgumentException("only HDFS is supported")
    }
  } 
  
private def metaDataCommitterType: String = {
    sourceOptions.getOrElse("executor.metadata.committer", "hdfs").toString
  } 

I am relying on the community to implement dynamodb committer. cc @VikramBPurohit

@itsvikramagr itsvikramagr added the good first issue Good for newcomers label Sep 14, 2018
@itsvikramagr itsvikramagr added the enhancement New feature or request label May 15, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers
Projects
None yet
Development

No branches or pull requests

2 participants