Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Python expansion with multiple SqlTransforms is extremely slow #31227

Open
1 of 16 tasks
gergely-g opened this issue May 9, 2024 · 7 comments
Open
1 of 16 tasks
Assignees

Comments

@gergely-g
Copy link

gergely-g commented May 9, 2024

What happened?

When building a Pipeline with multiple SqlTransforms from Beam Python, the expansion that happens in SqlTransforms is currently (Beam 2.55.0) extremely inefficient.

This inefficiency has multiple sources.

  1. By default, a new BeamJarExpansionService() is started for each ExpansionService.
  2. The ResolveArtifacts call will unconditionally download the 300MB beam-sdks-java-extensions-sql-expansion-service-2.55.0.jar to a temporary directory for each step (!).

The latter dominates execution time.
For example running a Beam from a 4 vCPU, 2 core, 16 GB memory machine (standard Dataflow workbench setup) a Pipeline with 31 trivial SQL transforms takes 200 seconds to execute.
(See example below.)

We found a somewhat dirty workaround to speed things up by skipping the SqlTransform._resolve_artifacts() altogether when working from inside Jupyter.

This brings down the execution speed from 200s to 22s.

I suspect these inefficiencies also contribute to beam_sql being extremely slow even for trivial queries.

apache_beam/runners/portability/artifact_service.py contains this code snippet that might be one of the culprits for this inefficiency (note the and False):

    if os.path.exists(
        payload.path) and payload.sha256 and payload.sha256 == sha256(
            payload.path) and False:
      return artifact
    else:
      return store_artifact(artifact, service, dest_dir)

In addition, once the ExpansionService is cached it only takes 100-200ms to perform the actual SQL expansion, but the ArtifactRetrievalService.ResolveArtifacts() call takes 1.5s per SQL query even without the downloading of the actual files. This dominates the expansion time, which dominates the overall time of launching and running a pipeline.

So the hotspot call sequence is something like:

  • SqlTransform.expand()
  • ExternalTransform.expand()
  • ArtifactRetrievalService.ResolveArtifacts()

The times may not sound like much, but latency is bad enough to ruin the Jupyter REPL experience when combining Python + SQL.

Code to repro and demonstrate the workaround.

import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
from apache_beam.transforms.external import BeamJarExpansionService
import time

# By default this pipeline of 31 SQL transforms takes 200s to execute.
# With the short-circuiting below execution time is reduced to 22s.

class FasterSqlTransform(SqlTransform):
    def _resolve_artifacts(self, components, service, dest):
        # Short circuit unnecessary call that results in the downloading of the 300MB
        # beam-sdks-java-extensions-sql-expansion-service-2.55.0.jar for each transform.
        return components

start_time = time.time()

# FasterSqlTransform with naive but shared BeamJarExpansionService
with beam.Pipeline() as p:
    with BeamJarExpansionService(':sdks:java:extensions:sql:expansion-service:shadowJar') as shared_expansion_service:
        sql_result = (p |
                      "Begin SQL" >> FasterSqlTransform("SELECT 'hello' AS message, 1 AS counter",
                                                           expansion_service=shared_expansion_service))
        for i in range(30):
            sql_result = (sql_result
                          | f"SQL {i}" >>
                          FasterSqlTransform("SELECT message, counter + 1 AS counter FROM PCOLLECTION",
                                                expansion_service=shared_expansion_service))
        sql_result | beam.LogElements()

print(f"Pipeline took {time.time() - start_time} s to execute")

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@liferoad
Copy link
Collaborator

liferoad commented May 9, 2024

The workaround looks good for the Python Direct Runner. @tvalentyn

@tvalentyn
Copy link
Contributor

cc: @chamikaramj

@chamikaramj
Copy link
Contributor

The ResolveArtifacts call will unconditionally download the 300MB beam-sdks-java-extensions-sql-expansion-service-2.55.0.jar to a temporary directory for each ste

The downloaded jars should be cached. Probably this caching doesn't work for your environment ?

You also have the option of manually specifying the jar [1] or manually starting up an expansion service [2].

[1] --beam_services="{\":sdks:java:extensions:sql:expansion-service:shadowJar\": \"$EXPANSION_SERVICE_JAR\"}"
[2] https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/#choose-an-expansion-service

@gergely-g
Copy link
Author

@chamikaramj The cache hit will never be detected for the downloaded JARs because of this line:

It always evaluates to False.

A worse problem though that, as mentioned above ArtifactRetrievalService.ResolveArtifacts() call takes 1.5s per SQL query even without the downloading of the actual files.

@liferoad
Copy link
Collaborator

@robertwb can you check this?

@wollowizard
Copy link

Hi any news? I have also encountered this exact same issue

@chamikaramj
Copy link
Contributor

and False part does seem like a bug but I don't think that actually gets hit since the Java expansion response serves Beam artifacts as DEFERRED artifacts that are retrieved from the locally available expansion service (so URN is DEFERRED not FILE).

https://github.com/apache/beam/blob/fed6489124000b3f222dc444136009ab22e4846e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java#L452C31-L452C48

Expansion service jar is cached elsewhere when starting up the expansion service and served to Python side using the ArtifactRetrievalService.ResolveArtifacts() API. This might be adding the O(seconds) per-query delay you are observing unfortunately.

cached_jar = os.path.join(cache_dir, os.path.basename(url))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants