Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

What may be causing and how to work around StreamingQueryException: Gave up after 3 retries while fetching MetaData ? #110

Open
dgoldenberg-audiomack opened this issue Mar 14, 2022 · 2 comments

Comments

@dgoldenberg-audiomack
Copy link

dgoldenberg-audiomack commented Mar 14, 2022

Spark 3.1.1, running in AWS EMR 6.3.0, python 3.7.2

I'm getting the following error:

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.StreamingQueryException: Gave up after 3 retries while fetching MetaData, last exception: 
=== Streaming Query ===
Identifier: [id = e825addf-9c21-4e9d-a05b-581ae8911f29, runId = e2ea753f-d2dc-42ea-bec2-17a516faadf7]
Current Committed Offsets: {KinesisSource[events-prod]: {"shardId-000000000035":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000041":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000044":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000038":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000032":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000043":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"metadata":{"streamName":"events-prod","batchId":"0"},"shardId-000000000031":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000034":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000040":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000037":
.................................................................

I have tried to increase the max num retries and the retry interval, e.g.:

MAX_NUM_RETRIES = 10  # default is 3
RETRY_INTERVAL_MS = 3000  # default is 1000
MAX_RETRY_INTERVAL_MS = 30000  # default is 10000

spark.readStream.format("kinesis")
        .option("streamName", pctx.stream_name)
        .option("endpointUrl", pctx.endpoint_url)
        .option("region", pctx.region_name)
        .option("checkpointLocation", pctx.checkpoint_path)
        .option("startingposition", "LATEST")
        .option("kinesis.client.numRetries", MAX_NUM_RETRIES)
        .option("kinesis.client.retryIntervalMs", RETRY_INTERVAL_MS)
        .option("kinesis.client.maxRetryIntervalMs", MAX_RETRY_INTERVAL_MS)
        .load()

but it seems the code keeps holding onto the default value of 3 retries.

Any ideas, anyone?

  • What may be causing this issue
  • How to work around it. Might it be good to set failondataloss=false, or is that a bad idea.

Thanks

@dgoldenberg-audiomack
Copy link
Author

From stderr.out on EMR.

Any ideas as to why these files HDFSMetadataCommitter is looking for might be getting deleted or are non-existent?

22/03/15 00:54:00 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 1]
java.lang.IllegalStateException: hdfs://ip-10-2-XXX-XXX.awsinternal.acme.com:8020/mnt/tmp/temporary-03b8fecf-32d5-422c-9375-4c3450ed0bb8/sources/0/shard-commit/0 does not exist
    at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.$anonfun$get$1(HDFSMetadataCommitter.scala:163)
    at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229)
    at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:151)
    at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275)
    at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$6(MicroBatchExecution.scala:399)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:399)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:382)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:613)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:378)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:211)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)

@dgoldenberg-audiomack
Copy link
Author

This issue I've filed seems a DUP of #57.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant