Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Improve handling of 'not found' BigQuery dataset/table errors with appropriate retry policy #31226

Open
1 of 16 tasks
yu-iskw opened this issue May 9, 2024 · 0 comments

Comments

@yu-iskw
Copy link
Contributor

yu-iskw commented May 9, 2024

What happened?

Overview

Our Dataflow pipeline is designed to transfer data from Google Pub/Sub to BigQuery. It utilizes a custom dynamic destination to dynamically determine the target BigQuery table based on the JSON content of the Pub/Sub message.

We have identified a recurring issue where the pipeline encounters a failure if the specified destination table is absent in BigQuery, resulting in a 404 Not Found error. The expected behavior is for the pipeline to manage such errors gracefully and attempt retries according to the defined retry policy.

Despite configuring the pipeline with InsertRetryPolicy.neverRetry(), it continues to terminate with the same error upon encountering a non-existent table. As a result, the pipeline continues to retry to insert the invalid data as #20211 .

Environment

  • Apache Beam SDK in Java: 2.56.0

Desired behavior

We aim to customize error handling and retry operations according to a specified retry policy. For example, we could implement a custom retry policy that specifically avoids retrying operations that result in a 404 Not Found error.

Sample code

Here is a snippet of the code that configures the pipeline to write to BigQuery:

WriteResult writeResult = convertedTableRows.get(TRANSFORM_OUT).apply("WriteSuccessfulRecords",
                BigQueryIO.writeTableRows().withoutValidation()
                                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                                .withExtendedErrorInfo()
                                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                                .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
                                .ignoreUnknownValues()
                                .to(new CustomBigQueryDynamicDestination()));

Error message

We have masked sensitive information like the project ID and dataset ID in the error message below:

Error message from worker: java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
POST https://bigquery.googleapis.com/bigquery/v2/projects/[PROJECT_ID]/datasets/[DATASET_ID]/tables/no_such_table/insertAll?prettyPrint=false
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "Not found: Table [PROJECT_ID]:[DATASET_ID].no_such_table",
    "reason" : "notFound"
  } ],
  "message" : "Not found: Table [PROJECT_ID]:[DATASET_ID].no_such_table",
  "status" : "NOT_FOUND"
}
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:1240)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:1303)
        org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:403)
        org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$900(BatchedStreamingWrite.java:67)
        org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:286)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
POST https://bigquery.googleapis.com/bigquery/v2/projects/[PROJECT_ID]/datasets/[DATASET_ID]/tables/no_such_table/insertAll?prettyPrint=false
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "Not found: Table [PROJECT_ID]:[DATASET_ID].no_such_table",
    "reason" : "notFound"
  } ],
  "message" : "Not found: Table [PROJECT_ID]:[DATASET_ID].no_such_table",
  "status" : "NOT_FOUND"
}
        com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
        com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:439)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:525)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:576)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$InsertBatchofRowsCallable.call(BigQueryServicesImpl.java:985)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$InsertBatchofRowsCallable.call(BigQueryServicesImpl.java:932)
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:75)
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
        java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        java.lang.Thread.run(Thread.java:750)
java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
POST https://bigquery.googleapis.com/bigquery/v2/projects/[PROJECT_ID]/datasets/[DATASET_ID]/tables/event/insertAll?prettyPrint=false
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "Not found: Dataset [PROJECT_ID]:[DATASET_ID]",
    "reason" : "notFound"
  } ],
  "message" : "Not found: Dataset [PROJECT_ID]:[DATASET_ID]",
  "status" : "NOT_FOUND"
}
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:1240)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:1303)
        org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:403)
        org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$900(BatchedStreamingWrite.java:67)
        org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:286)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
POST https://bigquery.googleapis.com/bigquery/v2/projects/[PROJECT_ID]/datasets/[DATASET_ID]/tables/event/insertAll?prettyPrint=false
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "Not found: Dataset [PROJECT_ID]:[DATASET_ID]",
    "reason" : "notFound"
  } ],
  "message" : "Not found: Dataset [PROJECT_ID]:[DATASET_ID]",
  "status" : "NOT_FOUND"
}
        com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
        com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:439)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:525)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:576)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$InsertBatchofRowsCallable.call(BigQueryServicesImpl.java:985)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$InsertBatchofRowsCallable.call(BigQueryServicesImpl.java:932)
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:75)
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
        java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        java.lang.Thread.run(Thread.java:750)

Issue Priority

Priority: 3 (minor)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant