-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Read Parquet data file with projection #245
Conversation
arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true) | ||
.with_metadata(HashMap::from([( | ||
PARQUET_FIELD_ID_META_KEY.to_string(), | ||
"0".to_string(), | ||
)])), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a look at the table metadata. The written Parquet schema doesn't match with the table schema. As we need to project correct columns in the parquet file, I changed this.
af7d6d1
to
abc1c2a
Compare
30010ca
to
cbf84d2
Compare
@liurenjie1024 Thanks for providing some references to #251, #252. I took at the Python reading projection in https://github.com/apache/iceberg-python/blob/6c8ea0effac0942ad4e880e5eef627473a354040/pyiceberg/io/pyarrow.py#L939. I'm wondering if we actually need #251 and #252 for pruning column here. For the arrow Parquet reader, it only requires us to identify the columns to read through In the Python implementation, it requires #251 because it calls the scanner API that needs the pruned schema. For us, I don't see where we need the pruned schema. I updated how to leverage |
Cool, I'll take a look later. Maybe java's version is similar to this one. |
Hi, @viirya Sorry for late reply, it took me some time to totally understand the projection in iceberg, and I've written up a summary here. Thanks for the idea of using projection mask, it helps a lot in pruning unnecessary columns. However I feel that we still miss sth, and I want to continue the discussion in #244 before merging this, what do you think? |
@liurenjie1024 Thanks. Let me read through your summary first and explain what I've done in this PR in #244. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crates/iceberg/src/arrow.rs
Outdated
@@ -49,10 +54,17 @@ impl ArrowReaderBuilder { | |||
self | |||
} | |||
|
|||
/// Sets the desired column projection with a list of field ids. | |||
pub fn with_field_ids(mut self, field_ids: Vec<usize>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub fn with_field_ids(mut self, field_ids: Vec<usize>) -> Self { | |
pub fn with_field_ids(mut self, field_ids: impl IntoIterator<Item=usize>) -> Self { |
crates/iceberg/src/arrow.rs
Outdated
), | ||
)); | ||
} | ||
column_map.insert(basic_info.id(), idx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need also a check that their types are matched. How about converting parquet schema to arrow schema, and uses filter_leaves
to do this match check? This way we only need to deal with iceberg schema and arrow schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed to use filter_leaves
. Compared to what I did with Parquet schema, However, it doesn't look quite good for the usage.
Because the filter of filter_leaves
is not supported to propagate error inside the closure, we cannot make it well propagating error happened during matching the fields.
Although it can be improved as we can probably go to propose a change to the filter_leaves
API . But in this version, we might tolerant it if we want to use filter_leaves
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that maybe we should modify filter_leaves
's api to return error.
@@ -187,6 +190,22 @@ impl TableScan { | |||
let mut arrow_reader_builder = | |||
ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone()); | |||
|
|||
let mut field_ids = vec![]; | |||
for column_name in &self.column_names { | |||
let field_id = self.schema.field_id_by_name(column_name).ok_or_else(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed in #244, we need to do two checks here to ensure that it's valid:
- The field is a direct child of schema, e.g. not a nested field. We can do this by calling
Schema::as_struct::field_by_id::is_some
- Ensure that this field is primitive type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added these checks.
Thank you @liurenjie1024 for review. I will address these comments soon. |
93b4283
to
1356baf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @viirya for this pr, looks great! It would be better to report the error as FeatureNotSupport
to make it more friendly to user, what do you think?
crates/iceberg/src/scan.rs
Outdated
ErrorKind::DataInvalid, | ||
format!( | ||
"Column {} is not a direct child of schema but a nested field. Schema: {}", | ||
column_name, self.schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ErrorKind::DataInvalid, | |
format!( | |
"Column {} is not a direct child of schema but a nested field. Schema: {}", | |
column_name, self.schema | |
ErrorKind::FeatureNotSupported, | |
format!( | |
"Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}", | |
column_name, self.schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. I used FeatureUnsupported
.
crates/iceberg/src/scan.rs
Outdated
ErrorKind::DataInvalid, | ||
format!( | ||
"Column {} is not a primitive type. Schema: {}", | ||
column_name, self.schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto, returning a feature not supported error would be more user friendly.
Thank you @liurenjie1024. Yea, I think it should be better to use |
Cool, thanks! |
We can read Parquet file with
TableScan
as a stream of ArrowRecordBatches
now. However, it reads all columns without projections of columns. This patch makesTableScanBuilder.select
work to propagate selected columns toTableScan
to apply the projection to scan operation.close #244