Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

startingPosition AT_TIMESTAMP iteratorOption NumberFormatException #93

Open
adamlbailey opened this issue Oct 1, 2020 · 9 comments
Open

Comments

@adamlbailey
Copy link

Excellent addition for reading from stream at specific positions per #78

However, I'm having trouble using the "AT_TIMESTAMP" option to read from shards at specific timestamps.

The shardInfo object I'm using is below:

{
  "metadata": {
    "streamName": "QSR-data-stream-production",
    "batchId": "1"
  },
  "shardId-000000000002": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "2020-09-30T19:58:46.480-00:00"
  },
  "shardId-000000000003": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "2020-09-30T19:58:46.480-00:00"
  },
  "shardId-000000000004": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "2020-09-30T19:58:46.480-00:00"
  },
  "shardId-000000000005": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "2020-09-30T19:58:46.480-00:00"
  }
}

Here is the exception:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 9, ip-10-0-9-81.us-west-2.compute.internal, executor 5): java.lang.NumberFormatException: For input string: "2020-09-30T19:58:46.480-00:00"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Long.parseLong(Long.java:589)
	at java.lang.Long.parseLong(Long.java:631)
	at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
	at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
	at org.apache.spark.sql.kinesis.KinesisReader.getShardIterator(KinesisReader.scala:120)
	at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getShardIterator(KinesisSourceRDD.scala:146)
	at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getNext(KinesisSourceRDD.scala:197)
	at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getNext(KinesisSourceRDD.scala:138)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
@adamlbailey
Copy link
Author

Update:

Using timestamp instead of iso formatted strings solves the problem. Would be nice to support iso strings as well for readability and compatibility with the options afforded by other kinesis libraries. Will consider adding a PR to address this soon.

{
  "metadata": {
    "streamName": "QSR-data-stream-production",
    "batchId": "1"
  },
  "shardId-000000000002": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "1601495926480"
  },
  "shardId-000000000003": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "1601495926480"
  },
  "shardId-000000000004": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "1601495926480"
  },
  "shardId-000000000005": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "1601495926480"
  }
}

@itsvikramagr
Copy link
Contributor

@adamlbailey - Looking forward to the PR.

@gopi-t2s
Copy link

Hi @adamlbailey,

I am trying to read the data from Kinesis using at_timestamp as option in startingposition.
Here is my below piece of code to achieve this

pos = json.dumps({"at_timestamp": "02/26/2021 3:07:13 PDT"})
kinesisDF = spark.readStream.format("kinesis").option("streamName", name).option("endpointUrl", URL).option("awsAccessKeyId",key).option("awsSecretKey",sKey).option("startingposition",pos).load()

Here is the ERROR message I am receiving
pyspark.sql.utils.IllegalArgumentException: 'org.json4s.package$MappingException: Expected object but got JString(02/26/2021 3:07:13 PDT)'

I am new to use this kinesis connector and I know the way I am passing value for the starting position is wrong, could you help me how to pass the at_timestamp as the value for the startposition option.

Thanks in Advance!

@adamlbailey
Copy link
Author

Hi @gopi-t2s, I'm somewhat removed from this work now but if memory serves:

You're going to want to construct an object exemplified in my previous comment.. Practically, I did this by writing a helper that described the stream so I could list each shard with the right timestamp Long value.

@gopi-t2s
Copy link

Thanks @adamlbailey for your inputs..

@chadlagore
Copy link
Contributor

I ran into this as well @gopi-t2s - were you able to make it work? I was unsure if pyspark was going to be supported for this,

@gopi-t2s
Copy link

No @chadlagore, I am still looking for the ways to attain this..

@nikitira
Copy link

I got it working with the upper example from @adamlbailey

minimal example in pyspark:

now_ts = datetime.now().strftime("%s") + "000"  # timestamp in epoch time format, e.g. "1601495926000"
from_timestamp = {
  "metadata": {
    "streamName": "my-stream",
    "batchId": "1"
  },
  "shardId-000000000000": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition":  now_ts
  }
}

starting_position = json.dumps(from_timestamp)

my_stream = (spark
                       .readStream
                       .format('kinesis')
                       .option('streamName', "my-stream")
                       .option('endpointUrl', KINESIS_ENDPOINT)
                       .option('region', KINESIS_REGION)
                       .option('startingposition', starting_position)

hope this helps @chadlagore @gopi-t2s

@gopi-t2s
Copy link

Thank you @nikitira , I will try this..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants