Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java): add spark catalog basic batch write #2133

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

LuQQiu
Copy link
Collaborator

@LuQQiu LuQQiu commented Mar 31, 2024

Add Spark catalog basic structure
Implemented the CREATE TABLE statement

@LuQQiu LuQQiu requested review from eddyxu and beinan March 31, 2024 03:19
@LuQQiu LuQQiu changed the title feat(java): add spark catalog basic structure [WIP] feat(java): add spark catalog basic structure Apr 1, 2024
Copy link

github-actions bot commented Apr 1, 2024

ACTION NEEDED

Lance follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

For details on the error please inspect the "PR Title Check" action.

* @param params write params
* @return Dataset
*/
public static Dataset createEmptyDataSet(String path, Schema schema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createEmptyDataset

try (RootAllocator allocator = new RootAllocator();
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
ByteArrayOutputStream schemaOnlyOutStream = new ByteArrayOutputStream();
try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing data is really tedious in this way.
Anyway we can improve the java / rust API ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetSchema is added, next will be adding the Spark write support.
Will work on designing/improving the write APIs after we have a clear idea of how write is called.
(Do feel it's very ugly now

return new ArrowType.Utf8();
} else if (dataType instanceof DoubleType) {
return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
} else if (dataType instanceof FloatType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious how does the FixedSizeListArray to be mapped in Spark.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to use Spark's ArrowUtils
Screen Shot 2024-04-03 at 7 56 34 PM

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emmmm Spark ArrowUtils only deal with ArrowType.List but do not have conversion for FixedSizeListArray

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some metadata / hints to help here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NP, will see how to deal with FixSizeListArray and also check other commonly used types

String tableName = "dev.db.lance_df_table";
// Same as create + insert
data.writeTo(tableName).using("lance").create();
spark.table(tableName).show();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also check that manifest files have been created under the directory?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add to the test! thanks

@LuQQiu LuQQiu force-pushed the sparkWriteStructure branch 2 times, most recently from da7ac2c to 867e23a Compare April 10, 2024 04:28
Copy link
Contributor

@eddyxu eddyxu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor issues

@@ -120,3 +124,32 @@ impl JMapExt for JMap<'_, '_, '_> {
get_map_value(env, self, key)
}
}

pub struct SingleRecordBatchReader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?
Can we reuse arrow's RecordBatchIterator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

private static DataType convert(org.apache.arrow.vector.types.pojo.FieldType fieldType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move the full qualitifed type to import?

Copy link
Contributor

@eddyxu eddyxu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending CI fix and address all comments

@LuQQiu
Copy link
Collaborator Author

LuQQiu commented Apr 11, 2024

@eddyxu Since this PR has some ongoing spark work e.g. the fragment create is completed but the commit is not implement, could we merge the jni one first? #2175. #2175 is self contained

private final ArrowWriter writer;

private UnpartitionedDataWriter(String datasetUri, StructType sparkSchema) {
// TODO(lu) add Lance Spark configuration of maxRowsPerFragment?
Copy link
Collaborator Author

@LuQQiu LuQQiu Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eddyxu the java approach is write to VectorSchemaRoot, unload from vectorSchemaRoot to c.ArrowArray, and pass to Lance to write to a Lance Fragment.
If i do small batch write could result in a large amount of small Lance Fragment files. If i do large batch write, could result in high memory consumption. Haven't found an existing good Arrow supported approach that Spark can keep write batches to VectorSchemaRoot and Lance Java API can keep unload batches and turn to c.ArrowArray and keep appending to same fragment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will improve this in following PR

@LuQQiu LuQQiu changed the title [WIP] feat(java): add spark catalog basic structure feat(java): add spark catalog basic structure Apr 26, 2024
@LuQQiu LuQQiu changed the title feat(java): add spark catalog basic structure feat(java): add spark catalog basic batch write Apr 26, 2024
@LuQQiu
Copy link
Collaborator Author

LuQQiu commented Apr 26, 2024

@eddyxu @QianZhu @chebbyChefNEQ PTAL thanks!
Add the basic batch write for Spark.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants