Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Hudi MOR high latency on data availability #11118

Open
sgcisco opened this issue Apr 29, 2024 · 4 comments
Open

[SUPPORT] Hudi MOR high latency on data availability #11118

sgcisco opened this issue Apr 29, 2024 · 4 comments
Labels
performance priority:major degraded perf; unable to move forward; potential bugs

Comments

@sgcisco
Copy link

sgcisco commented Apr 29, 2024

Describe the problem you faced

Running a streaming solution with Kafka - Structured Streaming (PySpark) - Hudi (MOR tables) + AWS Glue+S3 we observed periodically growing latencies on data availability at Hudi.
Latencies were measured as difference between data generation timestamp and _hudi_commit_timestamp and could go up to 30 min. Periodical manual checks for the latest available data points timestamps, by running queries as described here https://hudi.apache.org/docs/0.13.1/querying_data#spark-snap-query, confirmed such delays.

image

image

In case of using Spark with Hudi data read-out from Kafka had unstable rate

Screenshot 2024-04-29 at 11 49 29

To exclude impact from any other components but Hudi we ran some experiments with the same configuration and ingestion settings but without Hudi and with a direct write on S3. It did not reveal any delays above 2 mins, where 1 min delay is always present due to Structured Streaming minibatch granularity. In this case a read-out Kafka rate was stable overtime.

Additional context

What tried

  1. We tried to optimize Hudi file sizing and MOR layout by applying suggestions from these references [SUPPORT] How to run Periodic Compaction? Multiple Tables - When no Upserts #2151 (comment),
    https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoItoavoidcreatingtonsofsmallfiles,
    [SUPPORT] How to run Periodic Compaction? Multiple Tables - When no Upserts #2151 (comment)

We could get a target file size between 90-120Mb by downing hoodie.copyonwrite.record.size.estimate from 1024 to 100 and using Inline.compact=false and delta.commits=1 and async.compact=true and hoodie.merge.small.file.group.candidates.limit=20 but it did not have any impact on a latency.

  1. Another commit strategy NUM_OR_TIME as suggested here [SUPPORT] compaction.delta_seconds is not working for compaction strategy num_or_time  #8975 (comment) with parameters below did not help to resolve a problem
"hoodie.copyonwrite.record.size.estimate": "100",
"hoodie.compact.inline.trigger.strategy": "NUM_OR_TIME",
"hoodie.metadata.compact.max.delta.commits": "5",
"hoodie.compact.inline.max.delta.seconds": "60",

Current settings

As a trade-off we came up to the configuration below, which allows us to have relatively low latencies for 90th percentile and file size 40-90Mb

"hoodie.merge.small.file.group.candidates.limit": "40",
"hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS",

10_31_12

But still some records could go up to 30 min.

02_42_29

However the last config works relatively well for low ingestion rates up to 1.5Mb/s with a daily partitioning partition_date=yyyy-MM-dd/ but stops work for the rates above 2.5 Mb/s even with more granular partitioning partition_date=yyyy-MM-dd-HH/

Expected behavior

Since we use MOR tables:

  • low latencies on data availability
  • proper file sizing defined by the limits
    "hoodie.parquet.small.file.limit" : "104857600",
    "hoodie.parquet.max.file.size" : "125829120",

Environment Description

  • Hudi version : 0.13.1

  • Spark version : 3.4.1

  • Hive version : 3.1

  • Hadoop version : EMR 6.13

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : No

Hudi configuration

                "hoodie.datasource.hive_sync.auto_create_database": "true",
                "hoodie.datasource.hive_sync.enable": "true",
                "hoodie.datasource.hive_sync.mode": "hms",
                "hoodie.datasource.hive_sync.table": table_name,
                "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
                "hoodie.datasource.hive_sync.use_jdbc": "false",
                "hoodie.datasource.hive_sync.database": _glue_db_name,
                "hoodie.datasource.write.hive_style_partitioning": "true",
                "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
                "hoodie.datasource.write.operation": "upsert",
                "hoodie.datasource.write.schema.allow.auto.evolution.column.drop": "true",
                "hoodie.datasource.write.table.name": table_name,
                "hoodie.datasource.write.table.type": "MERGE_ON_READ",
                "hoodie.datasource.write.table.name": table_name,
                "hoodie.metadata.index.bloom.filter.enable": "true",
                "hoodie.metadata.index.column.stats.enable": "true",
                "hoodie.table.name": table_name,
                "hoodie.parquet.small.file.limit" : "104857600",
                "hoodie.parquet.max.file.size" : "125829120",
                "hoodie.merge.small.file.group.candidates.limit": "40",
                "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS",

Spark configuration

            "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
            "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
            # Glue support
            "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
            # Spark resource
            "spark.driver.cores": "4",
            "spark.driver.memory": "4400m",
            "spark.driver.memoryOverhead": "800m",
            "spark.executor.cores": "4",
           "spark.executor.memory": "4400m",
           "spark.executor.memoryOverhead": "800m",
           "spark.dynamicAllocation.initialExecutors": "4",
           "spark.dynamicAllocation.minExecutors": "4",
           "spark.dynamicAllocation.maxExecutors": "8"
@ad1happy2go
Copy link
Contributor

Thanks for raising this @sgcisco . I noticed you are using compact num.delta commits as 1. Any reason for the same. If we need to compact after every commit, then better we use COW table itself.
One other reason may be the ingestion Job is starved of resources as async compact job may be consuming. Did we analysed spark UI. Which stage is started taking more time.

@sgcisco
Copy link
Author

sgcisco commented Apr 30, 2024

@ad1happy2go thanks for your reply. We tried compact num.delta commits as 1 in one of the tests for other runs and in what try to use now it is a default value which is 5.

As another test attempt we tried to run a pipeline over several days but with lower ingestion rate 600Kb/s and the same Hudi and Spark configuration as above.

The most time consuming stage is Building workload profile which takes 2.5 - 12 min, with average around 7 min.

Screenshot 2024-04-30 at 19 44 00

Screenshot 2024-04-30 at 20 37 15

Over 3 days partitions latencies look as

Screenshot 2024-04-30 at 21 00 20

So in this case it is around 35-40Mb per minute, current Structured Streaming minibatch, and workers can go up to 35Gb and 32 cores.
Does it look as a sufficient resource config for Hudi to handle such load?

@ad1happy2go
Copy link
Contributor

@sgcisco What is nature of your record key? Is it random id ? Building workload profile do the index lookup which is basically the join between the existing data with the incremental data to identify which records to be updated or inserted.
Are you seeing the disk spill during this operation, you can try increasing the executor memory to avoid the same.

@sgcisco
Copy link
Author

sgcisco commented May 1, 2024

@ad1happy2go record key looks as record_keys=["timestamp", "A", "B", "C"],. Where timestamp is monotonically increasing in ms, A a string with a range of some 500k values, B is similar to A, C is max hundred values.
We use upsert which is a default operation but we don't expect any updates on the inserted values.
We tried insert but observed latencies were worse.

Increasing partitioning granularity from daily to hourly seems help to decrease latencies but not to solve the problem completely.
Screenshot 2024-05-01 at 22 07 16

In this case partitioning size goes down from 100Gb to 4.7Gb.

Are you seeing the disk spill during this operation, you can try increasing the executor memory to avoid the same.

No, over 15h running job

Screenshot 2024-05-01 at 22 19 07

In this case, with low ingestion rate ~600Kb/s and hourly partitions, at Spark Structured streaming Operation duration/Batch duration grows each time towards end of the hour.

image

Which looks similar for latencies in written data

image

@codope codope added performance priority:major degraded perf; unable to move forward; potential bugs labels May 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance priority:major degraded perf; unable to move forward; potential bugs
Projects
Status: Awaiting Triage
Development

No branches or pull requests

3 participants