Skip to content

Commit

Permalink
Support default value and json format import for Tensor and TensorArr…
Browse files Browse the repository at this point in the history
…ay data type (#1241)

### What problem does this PR solve?

Support default value and json format import for Tensor and TensorArray
data type

Issue link:#1179

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Refactoring
- [x] Test cases
  • Loading branch information
yangzq50 committed May 23, 2024
1 parent 9598242 commit 005bc86
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 280 deletions.
4 changes: 2 additions & 2 deletions src/executor/expression/expression_evaluator.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

module;

export module expression_evaluator;

import stl;
import base_expression;
import aggregate_expression;
Expand All @@ -28,8 +30,6 @@ import data_block;
import column_vector;
import expression_state;

export module expression_evaluator;

namespace infinity {

export class ExpressionEvaluator {
Expand Down
4 changes: 2 additions & 2 deletions src/executor/expression/expression_state.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

module;

export module expression_state;

import stl;
import base_expression;
import aggregate_expression;
Expand All @@ -25,8 +27,6 @@ import value_expression;
import in_expression;
import column_vector;

export module expression_state;

namespace infinity {

export enum class AggregateFlag : i8 {
Expand Down
151 changes: 77 additions & 74 deletions src/executor/operator/physical_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,46 +539,80 @@ void PhysicalImport::CSVRowHandler(void *context) {
parser_context->block_entry_ = std::move(block_entry);
}

template <typename T>
void AppendJsonTensorToColumn(const nlohmann::json &line_json,
const String &column_name,
ColumnVector &column_vector,
EmbeddingInfo *embedding_info) {
Vector<T> &&embedding = line_json[column_name].get<Vector<T>>();
if (embedding.size() % embedding_info->Dimension() != 0) {
Status status = Status::ImportFileFormatError(
fmt::format("Tensor element count {} isn't multiple of dimension {}.", embedding.size(), embedding_info->Dimension()));
LOG_ERROR(status.message());
RecoverableError(status);
}
const auto input_bytes = embedding.size() * sizeof(T);
const Value embedding_value =
Value::MakeTensor(reinterpret_cast<const_ptr_t>(embedding.data()), input_bytes, column_vector.data_type()->type_info());
column_vector.AppendValue(embedding_value);
}

template <>
void AppendJsonTensorToColumn<bool>(const nlohmann::json &line_json,
const String &column_name,
ColumnVector &column_vector,
EmbeddingInfo *embedding_info) {
Vector<float> &&embedding = line_json[column_name].get<Vector<float>>();
if (embedding.size() % embedding_info->Dimension() != 0) {
Status status = Status::ImportFileFormatError(
fmt::format("Tensor element count {} isn't multiple of dimension {}.", embedding.size(), embedding_info->Dimension()));
LOG_ERROR(status.message());
RecoverableError(status);
}
const auto input_bytes = (embedding.size() + 7) / 8;
auto input_data = MakeUnique<u8[]>(input_bytes);
for (SizeT i = 0; i < embedding.size(); ++i) {
if (embedding[i]) {
input_data[i / 8] |= (1u << (i % 8));
SharedPtr<ConstantExpr> BuildConstantExprFromJson(const nlohmann::json &json_object) {
switch (json_object.type()) {
case nlohmann::json::value_t::boolean: {
auto res = MakeShared<ConstantExpr>(LiteralType::kBoolean);
res->bool_value_ = json_object.get<bool>();
return res;
}
case nlohmann::json::value_t::number_unsigned:
case nlohmann::json::value_t::number_integer: {
auto res = MakeShared<ConstantExpr>(LiteralType::kInteger);
res->integer_value_ = json_object.get<i64>();
return res;
}
case nlohmann::json::value_t::number_float: {
auto res = MakeShared<ConstantExpr>(LiteralType::kDouble);
res->double_value_ = json_object.get<double>();
return res;
}
case nlohmann::json::value_t::string: {
auto res = MakeShared<ConstantExpr>(LiteralType::kString);
auto str = json_object.get<String>();
res->str_value_ = strdup(json_object.get<String>().c_str());
return res;
}
case nlohmann::json::value_t::array: {
const u32 array_size = json_object.size();
if (array_size == 0) {
const auto error_info = "Empty json array!";
LOG_ERROR(error_info);
RecoverableError(Status::ImportFileFormatError(error_info));
return nullptr;
}
switch (json_object[0].type()) {
case nlohmann::json::value_t::boolean:
case nlohmann::json::value_t::number_unsigned:
case nlohmann::json::value_t::number_integer: {
auto res = MakeShared<ConstantExpr>(LiteralType::kIntegerArray);
res->long_array_.resize(array_size);
for (u32 i = 0; i < array_size; ++i) {
res->long_array_[i] = json_object[i].get<i64>();
}
return res;
}
case nlohmann::json::value_t::number_float: {
auto res = MakeShared<ConstantExpr>(LiteralType::kDoubleArray);
res->double_array_.resize(array_size);
for (u32 i = 0; i < array_size; ++i) {
res->double_array_[i] = json_object[i].get<double>();
}
return res;
}
case nlohmann::json::value_t::array: {
auto res = MakeShared<ConstantExpr>(LiteralType::kSubArrayArray);
res->sub_array_array_.resize(array_size);
for (u32 i = 0; i < array_size; ++i) {
res->sub_array_array_[i] = BuildConstantExprFromJson(json_object[i]);
}
return res;
}
default: {
const auto error_info = fmt::format("Unrecognized json object type in array: {}", json_object.type_name());
LOG_ERROR(error_info);
RecoverableError(Status::ImportFileFormatError(error_info));
return nullptr;
}
}
}
default: {
const auto error_info = fmt::format("Unrecognized json object type: {}", json_object.type_name());
LOG_ERROR(error_info);
RecoverableError(Status::ImportFileFormatError(error_info));
return nullptr;
}
}
const Value embedding_value =
Value::MakeTensor(reinterpret_cast<const_ptr_t>(input_data.get()), input_bytes, column_vector.data_type()->type_info());
column_vector.AppendValue(embedding_value);
}

void PhysicalImport::JSONLRowHandler(const nlohmann::json &line_json, Vector<ColumnVector> &column_vectors) {
Expand Down Expand Up @@ -667,42 +701,11 @@ void PhysicalImport::JSONLRowHandler(const nlohmann::json &line_json, Vector<Col
}
break;
}
case kTensor: {
auto embedding_info = static_cast<EmbeddingInfo *>(column_vector.data_type()->type_info().get());
// SizeT dim = embedding_info->Dimension();
switch (embedding_info->Type()) {
case kElemBit: {
AppendJsonTensorToColumn<bool>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemInt8: {
AppendJsonTensorToColumn<i8>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemInt16: {
AppendJsonTensorToColumn<i16>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemInt32: {
AppendJsonTensorToColumn<i32>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemInt64: {
AppendJsonTensorToColumn<i64>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemFloat: {
AppendJsonTensorToColumn<float>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemDouble: {
AppendJsonTensorToColumn<double>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
default: {
UnrecoverableError("Not implement: Embedding type.");
}
}
case kTensor:
case kTensorArray: {
// build ConstantExpr
SharedPtr<ConstantExpr> const_expr = BuildConstantExprFromJson(line_json[column_def->name_]);
column_vector.AppendByConstantExpr(const_expr.get());
break;
}
default: {
Expand Down
14 changes: 0 additions & 14 deletions src/parser/type/complex/embedding_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,20 +196,6 @@ struct EmbeddingType {
EmbeddingType &operator=(const EmbeddingType &other) = delete;

EmbeddingType &operator=(EmbeddingType &&other) = delete;
/*
EmbeddingType &operator=(EmbeddingType &&other) noexcept {
if (this == &other)
return *this;
if (ptr != nullptr) {
// LOG_TRACE("Target embedding isn't null, need to manually SetNull or Reset");
Reset();
}
const_cast<bool &>(new_allocated_) = other.new_allocated_;
ptr = other.ptr;
other.ptr = nullptr;
return *this;
}
*/

void Init(const void *ptr, size_t size);

Expand Down
4 changes: 0 additions & 4 deletions src/parser/type/complex/tensor_array_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#pragma once

#include "tensor_type.h"

namespace infinity {

#pragma pack(1)
Expand All @@ -24,8 +22,6 @@ struct TensorArrayType {
uint16_t tensor_num_ = 0;
uint16_t chunk_id_ = 0;
uint32_t chunk_offset_ = 0;

//[[nodiscard]] static std::string TensorArray2String(char *tensor_ptr, EmbeddingDataType type, size_t embedding_dimension, size_t embedding_num);
};

static_assert(sizeof(TensorArrayType) == sizeof(uint64_t));
Expand Down
139 changes: 25 additions & 114 deletions src/storage/column_vector/column_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ import buffer_manager;
import status;
import logical_type;
import embedding_info;
import base_expression;
import value_expression;
import expression_binder;
import cast_function;
import bound_cast_func;
import cast_expression;
import expression_evaluator;
import expression_state;

import block_column_entry;

Expand Down Expand Up @@ -1635,120 +1643,23 @@ void ColumnVector::AppendByStringView(std::string_view sv, char delimiter) {
}

void ColumnVector::AppendByConstantExpr(const ConstantExpr *const_expr) {
switch (data_type_->type()) {
case kBoolean: {
bool v = const_expr->bool_value_;
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kTinyInt: {
i8 v = static_cast<i8>(const_expr->integer_value_);
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kSmallInt: {
i16 v = static_cast<i16>(const_expr->integer_value_);
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kInteger: {
i32 v = static_cast<i32>(const_expr->integer_value_);
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kBigInt: {
i64 v = const_expr->integer_value_;
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kFloat: {
float v = static_cast<float>(const_expr->double_value_);
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kDouble: {
double v = const_expr->double_value_;
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kVarchar: {
std::string_view str_view = const_expr->str_value_;
AppendByStringView(str_view, ',');
break;
}
case kTensor:
case kTensorArray: {
// TODO: used by default value?
UnrecoverableError("Need fix!");
break;
}
case kEmbedding: {
auto embedding_info = static_cast<EmbeddingInfo *>(data_type_->type_info().get());
// SizeT dim = embedding_info->Dimension();
switch (embedding_info->Type()) {
case kElemInt8: {
Vector<i8> embedding;
embedding.reserve(const_expr->long_array_.size());
std::transform(const_expr->long_array_.begin(), const_expr->long_array_.end(), std::back_inserter(embedding), [](auto &v) {
return static_cast<i8>(v);
});
AppendByPtr(reinterpret_cast<const_ptr_t>(embedding.data()));
break;
}
case kElemInt16: {
Vector<i16> embedding;
embedding.reserve(const_expr->long_array_.size());
std::transform(const_expr->long_array_.begin(), const_expr->long_array_.end(), std::back_inserter(embedding), [](auto &v) {
return static_cast<i16>(v);
});
AppendByPtr(reinterpret_cast<const_ptr_t>(embedding.data()));
break;
}
case kElemInt32: {
Vector<i32> embedding;
embedding.reserve(const_expr->long_array_.size());
std::transform(const_expr->long_array_.begin(), const_expr->long_array_.end(), std::back_inserter(embedding), [](auto &v) {
return static_cast<i32>(v);
});
AppendByPtr(reinterpret_cast<const_ptr_t>(embedding.data()));
break;
}
case kElemInt64: {
Vector<i64> embedding;
embedding.reserve(const_expr->long_array_.size());
std::transform(const_expr->long_array_.begin(), const_expr->long_array_.end(), std::back_inserter(embedding), [](auto &v) {
return v;
});
AppendByPtr(reinterpret_cast<const_ptr_t>(embedding.data()));
break;
}
case kElemFloat: {
Vector<float> embedding;
embedding.reserve(const_expr->double_array_.size());
std::transform(const_expr->double_array_.begin(), const_expr->double_array_.end(), std::back_inserter(embedding), [](auto &v) {
return static_cast<float>(v);
});
AppendByPtr(reinterpret_cast<const_ptr_t>(embedding.data()));
break;
}
case kElemDouble: {
Vector<i8> embedding;
embedding.reserve(const_expr->double_array_.size());
std::transform(const_expr->double_array_.begin(), const_expr->double_array_.end(), std::back_inserter(embedding), [](auto &v) {
return v;
});
AppendByPtr(reinterpret_cast<const_ptr_t>(embedding.data()));
break;
}
default: {
UnrecoverableError("Not implement: Embedding type.");
}
}
break;
}
default: {
UnrecoverableError("Not implement: Invalid data type.");
}
ExpressionBinder tmp_binder(nullptr);
auto expr = tmp_binder.BuildValueExpr(*const_expr, nullptr, 0, false);
auto value_expr = std::dynamic_pointer_cast<ValueExpression>(expr);
if (value_expr->Type() == *data_type()) {
auto value_to_insert = value_expr->GetValue();
AppendValue(value_to_insert);
} else {
// try cast
BoundCastFunc cast = CastFunction::GetBoundFunc(value_expr->Type(), *data_type());
SharedPtr<BaseExpression> cast_expr = MakeShared<CastExpression>(cast, expr, *data_type());
SharedPtr<ExpressionState> expr_state = ExpressionState::CreateState(cast_expr);
SharedPtr<ColumnVector> output_column_vector = ColumnVector::Make(data_type());
output_column_vector->Initialize(ColumnVectorType::kConstant, 1);
ExpressionEvaluator evaluator;
evaluator.Init(nullptr);
evaluator.Execute(cast_expr, expr_state, output_column_vector);
AppendWith(*output_column_vector, 0, 1);
}
}

Expand Down

0 comments on commit 005bc86

Please sign in to comment.