Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Native] CTE support in Prestissimo #22630

Open
aditi-pandit opened this issue Apr 29, 2024 · 9 comments
Open

[Native] CTE support in Prestissimo #22630

aditi-pandit opened this issue Apr 29, 2024 · 9 comments

Comments

@aditi-pandit
Copy link
Contributor

aditi-pandit commented Apr 29, 2024

Expected Behavior or Use Case

Presto java supports CTE (WITH clause) with materialization.
https://prestodb.io/docs/0.286/admin/properties.html#cte-materialization-properties

Investigate their usage in Prestissimo

Presto Component, Service, or Connector

Presto native

Possible Implementation

Example Screenshots (if appropriate):

Context

@aditi-pandit
Copy link
Contributor Author

aditi-pandit commented Apr 30, 2024

Notes : Presto CTE design is based on https://www.vldb.org/pvldb/vol8/p1704-elhelw.pdf

The main PR Is #20887

The core of the logic to wire CT Producers and Consumers and Sequence nodes for CTEs is in the logical optimizer. This is all translated to Temp tables writes and reads in the physical planning.

TODO : Cover the gaps post physical planning.

@aditi-pandit
Copy link
Contributor Author

The first issue I have encountered is that CTE generates bucketed (but not partitioned) TEMP tables as supported by HMS. These are not supported in Prestissimo

presto:tpch> WITH temp as (SELECT orderkey FROM ORDERS) SELECT * FROM temp t1;

Query 20240501_001323_00026_5bknt failed: hiveInsertTableHandle->bucketProperty == nullptr || isPartitioned Bucketed table must be partitioned: {"@type":"hive","actualStorageFormat":"ORC","bucketProperty":{"bucketCount":100,"bucketFunctionType":"PRESTO_NATIVE","bucketedBy":["_c0_orderkey"],"sortedBy":[],"types":["bigint"]},"compressionCodec":"SNAPPY","inputColumns":[{"@type":"hive","columnType":"REGULAR","hiveColumnIndex":0,"hiveType":"bigint","name":"_c0_orderkey","requiredSubfields":[],"typeSignature":"bigint"}],"locationHandle":{"tableType":"TEMPORARY","targetPath":"file:/Users/aditipandit/ahana_dev/data/PARQUET/hive_data/__temporary_tables__/__presto_temporary_table_ORC_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586","writeMode":"DIRECT_TO_TARGET_NEW_DIRECTORY","writePath":"file:/Users/aditipandit/ahana_dev/data/PARQUET/hive_data/__temporary_tables__/__presto_temporary_table_ORC_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586"},"pageSinkMetadata":{"schemaTableName":{"schema":"__temporary_tables__","table":"__presto_temporary_table_orc_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586"},"table":{"dataColumns":[{"name":"_c0_orderkey","type":"bigint"}],"databaseName":"__temporary_tables__","owner":"user","parameters":{},"partitionColumns":[],"storage":{"bucketProperty":{"bucketCount":100,"bucketFunctionType":"PRESTO_NATIVE","bucketedBy":["_c0_orderkey"],"sortedBy":[],"types":["bigint"]},"location":"","parameters":{},"serdeParameters":{},"skewed":false,"storageFormat":{"inputFormat":"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat","outputFormat":"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat","serDe":"org.apache.hadoop.hive.ql.io.orc.OrcSerde"}},"tableName":"__presto_temporary_table_ORC_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586","tableType":"TEMPORARY_TABLE"}},"partitionStorageFormat":"ORC","preferredOrderingColumns":[],"schemaName":"__temporary_tables__","tableName":"__presto_temporary_table_orc_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586","tableStorageFormat":"ORC"}

VeloxUserError: hiveInsertTableHandle->bucketProperty == nullptr || isPartitioned Bucketed table must be partitioned: {"@type":"hive","actualStorageFormat":"ORC","bucketProperty":{"bucketCount":100,"bucketFunctionType":"PRESTO_NATIVE","bucketedBy":["_c0_orderkey"],"sortedBy":[],"types":["bigint"]},"compressionCodec":"SNAPPY","inputColumns":[{"@type":"hive","columnType":"REGULAR","hiveColumnIndex":0,"hiveType":"bigint","name":"_c0_orderkey","requiredSubfields":[],"typeSignature":"bigint"}],"locationHandle":{"tableType":"TEMPORARY","targetPath":"file:/Users/aditipandit/ahana_dev/data/PARQUET/hive_data/__temporary_tables__/__presto_temporary_table_ORC_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586","writeMode":"DIRECT_TO_TARGET_NEW_DIRECTORY","writePath":"file:/Users/aditipandit/ahana_dev/data/PARQUET/hive_data/__temporary_tables__/__presto_temporary_table_ORC_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586"},"pageSinkMetadata":{"schemaTableName":{"schema":"__temporary_tables__","table":"__presto_temporary_table_orc_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586"},"table":{"dataColumns":[{"name":"_c0_orderkey","type":"bigint"}],"databaseName":"__temporary_tables__","owner":"user","parameters":{},"partitionColumns":[],"storage":{"bucketProperty":{"bucketCount":100,"bucketFunctionType":"PRESTO_NATIVE","bucketedBy":["_c0_orderkey"],"sortedBy":[],"types":["bigint"]},"location":"","parameters":{},"serdeParameters":{},"skewed":false,"storageFormat":{"inputFormat":"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat","outputFormat":"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat","serDe":"org.apache.hadoop.hive.ql.io.orc.OrcSerde"}},"tableName":"__presto_temporary_table_ORC_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586","tableType":"TEMPORARY_TABLE"}},"partitionStorageFormat":"ORC","preferredOrderingColumns":[],"schemaName":"__temporary_tables__","tableName":"__presto_temporary_table_orc_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586","tableStorageFormat":"ORC"}

Will review this support in Presto Native.

@aditi-pandit
Copy link
Contributor Author

aditi-pandit commented May 13, 2024

Identified 3 sub-parts of coding for this feature:
i) Add support for bucketed (but not partitioned) tables. This support will enable CTAS to such tables. Although subsequent Insertions are not allowed. This is consistent with Presto behavior. In progress facebookincubator/velox#9740 and #22737
ii) Velox/Prestissimo are not aware of Temporary tables right now. Add this option and support as consistent for Presto. At his point CTE for DWRF/Parquet will work. facebookincubator/velox#9844 and #22780.
iii) Add an optimized format for materialization.

@tdcmeehan
Copy link
Contributor

iii) Add an optimized format for materialization.

I believe in Java we use the PRESTO_PAGE format. I wonder if it's cheaper to use Arrow for Prestissimo workers?

@aditi-pandit
Copy link
Contributor Author

iii) Add an optimized format for materialization.

I believe in Java we use the PRESTO_PAGE format. I wonder if it's cheaper to use Arrow for Prestissimo workers?

@tdcmeehan : Yes, Java uses PRESTO_PAGE format. For Velox, Arrow or just the format used for spilling should be efficient. I'll prototype the speed-ups seen with both.

@aditi-pandit
Copy link
Contributor Author

aditi-pandit commented May 17, 2024

@jaystarshot : facebookincubator/velox#9844 and #22780

In the Presto PR I derived a Native test from TestCteExecution.java so that all your tests are run on the Native side. 31 of the tests passed but 18 failed. I'm looking at the failures in more detail. But would be great if you took a look as well.

You have to apply the Velox PR changes in your Velox submodule with the Presto PR to get a working setup.

@jaystarshot
Copy link
Member

jaystarshot commented May 18, 2024

Even if we use arrow, after the reads and before the writes we still might need to convert it to presto page format when we would exchange the table scan stage. I believe the the spilling format should be the same as Presto page. link I found this serializer in code which https://github.com/facebookincubator/velox/blob/main/velox/serializers/PrestoSerializer.cpp#L49 seems to serialize in presto Page format.

@aditi-pandit
Copy link
Contributor Author

@jaystarshot : Spilling uses PrestoSerializer. I'll create a PR that add a PrestoPageWriter that we can wire into this code.

@jaystarshot
Copy link
Member

Thats awesome! you will need the reader as well. I was trying to look into this as a velox-beginner task for myself but I will leave it to the experts!

facebook-github-bot pushed a commit to facebookincubator/velox that referenced this issue May 26, 2024
Summary:
The Velox HiveConnector supports writing bucketed files only when they are partitioned as well. This presents a feature gap wrt Presto.

Presto behavior (for bucketed but not partitioned):

- Supports CTAS into bucketed (but not partitioned tables)
- Cannot append/overwrite to existing bucketed tables (though can append to TEMPORARY ones).

The CTAS into bucketed tables has become important because such tables are used for CTE (WITH clause).
Note: This PR only handles CTAS situations. There will be a separate PR for TEMPORARY tables. prestodb/presto#19744 prestodb/presto#22630

### Background
#### TableWriter and TableFinish

Presto uses TableWriter PlanNodes to do the writing operations. The TableWriter nodes run on the workers. These nodes write the input rows into data files (on a staging directory before moving them to a target directory). The TableWriter node works in conjunction with a TableCommit node on the co-ordinator. The TableCommit node (TableFinishOperator) does the final renaming of target directory and commit to the meta-store.

It is important to note that plans with Bucketed tables involve a LocalExchange that brings all the data to a single driver for TableWriter so that it can bucket and write the data appropriately.

```
EXPLAIN CREATE TABLE lineitem_bucketed2(orderkey, partkey, suppkey, linenumber, quantity, ds) WITH (bucket_count = 10, bucketed_by = ARRAY['orderkey'], sorted_by = ARRAY['orderkey']) AS SELECT orderkey, partkey, suppkey, linenumber, quantity, '2021-12-20' FROM tpch.tiny.lineitem;
```

Plan with TableWriter and TableCommit mode. Note the LocalExchange moving all data to a single driver.
```
- Output[PlanNodeId 7]
     - TableCommit[PlanNodeId 5][Optional[hive.tpch_bucketed.lineitem_bucketed2]] => [rows_23:bigint]
         - RemoteStreamingExchange[PlanNodeId 299][GATHER] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
             - TableWriter[PlanNodeId 6] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
                     orderkey := orderkey (1:194)  partkey := partkey (1:204) suppkey := suppkey (1:213) linenumber := linenumber (1:222) quantity := quantity (1:234) ds := expr (1:244)
                 - LocalExchange[PlanNodeId 330][SINGLE] () => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varchar(10)] >
                         - RemoteStreamingExchange[PlanNodeId 298][REPARTITION] => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varcha>
                              - ScanProject[PlanNodeId 0,187][table = TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, project>
                                 expr := VARCHAR'2021-12-20' suppkey := tpch:suppkey (1:262) partkey := tpch:partkey (1:262) linenumber := tpch:linenumber (1:262) orderkey := tpch:orderkey (1:262) quantity := tpch:quantity (1:262)
```

The above command creates 10 files as follows. 10 is the bucket count.

```
Aditis-MacBook-Pro:lineitem_bucketed aditipandit$ pwd
${DATA_DIR}/hive_data/tpch/lineitem_bucketed

Aditis-MacBook-Pro:lineitem_bucketed2 aditipandit$ ls
000000_0_20240507_221727_00018_73r2r
000003_0_20240507_221727_00018_73r2r
000006_0_20240507_221727_00018_73r2r
000009_0_20240507_221727_00018_73r2r
000001_0_20240507_221727_00018_73r2r
000004_0_20240507_221727_00018_73r2r
000007_0_20240507_221727_00018_73r2r
000002_0_20240507_221727_00018_73r2r
000005_0_20240507_221727_00018_73r2r
000008_0_20240507_221727_00018_73r2r
```

#### TableWriter output
The TableWriter output contains three columns per fragment (one for each individual target file).  This format is being presented for completeness.
**There are no special changes for bucketed tables here. The only important difference is that the writePath/targetPath would not contain the partition directory.**

| TableWriter output row |
|--------|
| ROW<rows:BIGINT,fragments:VARBINARY,commitcontext:VARBINARY> |

| Rows |  | Fragments |  | CommitContext |
|--------|--------|--------|--------|--------|
| N (numPartitionUpdates) |  | NULL |  | TaskCommitContext |
| NULL | | PartitionUpdate0 |  |  |
| NULL |  | PartitionUpdate1 |  |  |
| NULL |  | ... |  |  |
| NULL |  | PartitionUpdateN |  |  |

The fragments column is JSON strings of PartitionUpdate as in the following format
```
{
"Name": "ds=2022-08-06/partition=events_pcp_product_finder_product_similartiy__groupby__999999998000212604",
"updateMode": "NEW",
"writePath": "",
"targetPath": "",
"fileWriteInfos": [
   { "writeFileName": "", "targetFileName": "", "fileSize": 3517346970 },
   { "writeFileName": "", "targetFileName": "", "fileSize": 4314798687 }, ]
"rowCount": 3950431150,
"inMemoryDataSizeInBytes": 4992001194927,
"onDiskDataSizeInBytes": 1374893372141,
"containsNumberedFileNames": false
}
```

The commitcontext column is a constant vector of TaskCommitContext in JSON string
```
{
"lifespan": "TaskWide",
"taskId": "20220822_190126_00000_78c2f.1.0.0",
"pageSinkCommitStrategy": "TASK_COMMIT",
"lastPage": false
}
```

#### Empty buckets
The TableWriter generates PartitionUpdate messages only for the files it has written. So if there are empty buckets then there isn't a PartitionUpdate message for it.

If there are no PartitionUpdate output messages for any bucket, then the TableFinish operator fixes the HiveMetaStore with empty files for each bucket. https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java#L1794

### Design

As outlined above all table writing happens in the TableWriter operator.

The TableWriter forwards the write to the HiveDataSink which is registered by the HiveConnector for it.

The HiveDataSink already supported bucketed (and partitioned) tables. So all the logic for wiring bucket metadata and bucket computation already existed. The only missing piece was to handle fileNames for bucketed but not partitioned files in the writerIds, and map the proper writerId to input rows when appending to the HiveDataSink. This PR fixes that.

********************************************
Note: The Prestissimo changes are in prestodb/presto#22737

Pull Request resolved: #9740

Reviewed By: kewang1024

Differential Revision: D57748876

Pulled By: xiaoxmeng

fbshipit-source-id: 33bb77c6fce4d2519f3214e2fb93891f1f910716
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants