-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HIVE-28256: Iceberg: Major QB Compaction on partition level with evol… #5248
base: master
Are you sure you want to change the base?
Conversation
99d309d
to
4c36293
Compare
4c36293
to
746d1f2
Compare
746d1f2
to
abcf246
Compare
abcf246
to
7398bd5
Compare
7398bd5
to
c518d81
Compare
throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC); | ||
} | ||
partitions = partitions.stream().filter(part -> part.getSpec().size() == partitionSpec.size()).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are we checking here? number of partitions in table spec and compaction request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This validates that the partition spec given in the compaction command matches exactly one partition in the table, not a partial partition spec.
Let's say, a table has partitions with specs (a,b) and (a,b,c) because of evolution and a compaction command is run with spec (a,b). On line 144 it will find both partition specs and after filtering it will have only one (a,b) and will pass validation.
Another case, let's assume a table has the same partitions with specs (a,b) and (a,b,c) and a compaction command is run with spec (a). On line 144 it will find both partition specs and after filtering it will have zero partitions and will fail validation with TOO_MANY_COMPACTION_PARTITIONS exception.
commitOverwrite(table, branchName, startTime, filesForCommit, rewritePolicy); | ||
Integer compactionSpecId = outputTable.jobContexts.stream() | ||
.findAny() | ||
.map(x -> x.getJobConf().get(CompactorContext.COMPACTION_SPEC_ID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that partition spec id? name is confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed to make the name more clear
.map(x -> x.getJobConf().get(CompactorContext.COMPACTION_PARTITION_PATH)) | ||
.orElse(null); | ||
|
||
commitOverwrite(table, branchName, startTime, filesForCommit, rewritePolicy, compactionSpecId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we handle compaction separately, in diff method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was also thinking about it; Done.
c518d81
to
9e2301c
Compare
9e2301c
to
983403c
Compare
public static final String COMPACTION_PART_SPEC_ID = "compaction_part_spec_id"; | ||
public static final String COMPACTION_PARTITION_PATH = "compaction_partition_path"; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its better to move such constants to Iceberg specific classes since I see this being used only in Iceberg right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
throw new HiveException("Invalid partition spec, no corresponding spec_id found"); | ||
} | ||
|
||
int specId = partitionList.get(0).second(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a clause that this list should have only one partition in it. If not, throw exception?
@@ -81,11 +128,12 @@ public boolean run(CompactorContext context) throws IOException, HiveException, | |||
.filter(col -> !partSpecMap.containsKey(col)) | |||
.collect(Collectors.joining(",")); | |||
|
|||
compactionQuery = String.format("insert overwrite table %1$s partition(%2$s) select %4$s from %1$s where %3$s", | |||
compactionQuery = String.format("insert overwrite table %1$s partition(%2$s) " + | |||
"select %4$s from %1$s where %3$s and partition__spec__id = %5$d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we try to use VirtualColumn.PARTITION_SPEC_ID.getName() instead of partition__spec__id
?
This would indicate that we are using a virtual column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.map(x -> x.getJobConf().get(CompactorContext.COMPACTION_PARTITION_PATH)) | ||
.orElse(null); | ||
|
||
if (rewritePolicy != RewritePolicy.DEFAULT || compactionPartSpecId != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the rewrite policy in this case? Since I see only 2 enums - DEFAULT & ALL_PARTITIONS. Is there a chance that this can be null
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added new value 'PARTITION' as it was useful for handling your last review comment regarding validatePartSpec method.
LOG.info("Compaction commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime, | ||
table, results.dataFiles().size()); | ||
} else { | ||
LOG.info("Empty compaction commit, took {} ms for table: {}", System.currentTimeMillis() - startTime, table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does compaction ever reach this statement? Also the log stmt seems shady, there is no commit that has happened on the table when 0 files are present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought if a table is empty, then when trying to compact it might reach this place.
If there is no data, then there is nothing to commit, that's why there is no commit.
* @param results The object containing the new files | ||
*/ | ||
private void commitOverwrite(Table table, String branchName, long startTime, FilesForCommit results) { | ||
Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the idea behind this check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure and it wasn't added as part of compaction, it was there before.
@@ -1868,7 +1872,8 @@ public void validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable, | |||
} | |||
|
|||
Map<String, Types.NestedField> mapOfPartColNamesWithTypes = Maps.newHashMap(); | |||
for (PartitionField partField : table.spec().fields()) { | |||
List<PartitionField> allPartFields = IcebergTableUtil.getAllPartitionFields(table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getAllPartitionFields
essentially returns all columns across different specs of the table. Whereas validatePartSpec
API is used in many places where current table spec is expected. Hence I think this is incorrect.
Doing this might allow performing -
insert into table <tableName> partition (previous partition specs) ...
which should not be allowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Added 2 new methods 'validatePartAnySpec' and 'getPartitionAnySpec' which is needed for Iceberg compaction partition level which operates on all specs of a table, not only on the latest one.
… by spec by any past table specs. Moved Iceberg compaction constant to a class in Iceberg module. Use VirtualColumn.PARTITION_SPEC_ID.getName() instead of partition__spec__id.
Quality Gate passedIssues Measures |
…ution
What changes were proposed in this pull request?
Adding support for compacting a given partition of a Hive Iceberg table even if the table has undergone partition evolution. The partition spec can be current or one of the older partition specs of the table.
Why are the changes needed?
So far compaction on partition level wasn't supported for Hive Iceberg tables that have undergone partition evolution.
Does this PR introduce any user-facing change?
Yes. Users can now submit partition-level compaction requests for Hive Iceberg tables with partition spec that conforms to one of the previous partition specs in the table.
Is the change a dependency upgrade?
No
How was this patch tested?
New q-tests added