Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel process features #2216

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 117 additions & 10 deletions catboost/python-package/catboost/_catboost.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ from util.generic.array_ref cimport TArrayRef, TConstArrayRef
from util.generic.hash cimport THashMap
from util.generic.hash_set cimport THashSet
from util.generic.maybe cimport TMaybe
from util.generic.ptr cimport THolder, TIntrusivePtr, MakeHolder
from util.generic.ptr cimport TAtomicSharedPtr, THolder, TIntrusivePtr, MakeHolder
from util.generic.string cimport TString, TStringBuf
from util.generic.vector cimport TVector
from util.system.types cimport ui8, ui16, ui32, ui64, i32, i64
Expand Down Expand Up @@ -954,6 +954,8 @@ cdef inline float _FloatOrNanFromString(const TString& s) except *:
cdef extern from "catboost/libs/gpu_config/interface/get_gpu_device_count.h" namespace "NCB":
cdef int GetGpuDeviceCount() except +ProcessException

ctypedef void (*task_callback_ptr) (int) nogil
ctypedef void (*callback_ptr) (int, int) nogil

cdef extern from "catboost/python-package/catboost/helpers.h":
cdef void SetPythonInterruptHandler() nogil
Expand All @@ -979,6 +981,11 @@ cdef extern from "catboost/python-package/catboost/helpers.h":
int threadCount,
ui64 cpuUsedRamLimit
) except +ProcessException
cdef void CallInParallel(ILocalExecutor* executor, task_callback_ptr callback, int block_count) nogil
cdef size_t column_block_size
cdef size_t objects_in_column
cdef callback_ptr CallbackForColumnProcessing
cdef TVector[TString]* processing_result


cdef extern from "catboost/python-package/catboost/helpers.h":
Expand Down Expand Up @@ -2691,12 +2698,78 @@ cdef _set_features_order_data_pd_data_frame_categorical_column(
)


cdef class ColumnProcessor:

def __cinit__(self, object callback, object column_array):
self.callback = callback
self.column_array = column_array
print(f"processing_result[0][0] = {processing_result[0][0]}")

def __call__(self, start, end):
for idx in range(start, end):
self.callback(idx, self.column_array[idx], processing_result[0][idx])


cdef void call_python_code(int block_id) nogil :
CallbackForColumnProcessing(
block_id * column_block_size,
min(
(block_id + 1) * column_block_size,
objects_in_column
)
)

g_column_array = None
g_object_process_callback = None

cdef void processor(int start, int end) :
global g_object_process_callback, g_column_array, processing_result
for idx in range(start, end):
g_object_process_callback(idx, g_column_array[idx], processing_result[0][idx])

cdef parallel_process_features_column_to_vector(
# TAtomicSharedPtr[TTbbLocalExecutor] executor,
# ILocalExecutor* executor,
int thread_count,
column_array,
TVector[TString]* result,
object object_process_callback,
size_t block_size = 1024 * 16
) :
global processing_result, column_block_size, CallbackForColumnProcessing, objects_in_column, g_column_array, g_object_process_callback
processing_result = result
column_block_size = block_size
CallbackForColumnProcessing = <callback_ptr>(&processor)
objects_in_column = len(column_array)
g_column_array = column_array
g_object_process_callback = object_process_callback

cdef size_t block_count = (objects_in_column + block_size - 1) // block_size
cdef THolder[TTbbLocalExecutor] local_executor = MakeHolder[TTbbLocalExecutor](thread_count)
cdef ILocalExecutor* executor_ptr = <ILocalExecutor*>local_executor.Get()


#print(f"objects_in_column = {objects_in_column}, block_size = {block_size}, block_count = {block_count}")
#for block_id in range(block_count):
# call_python_code(block_id)
#for idx in range(objects_in_column):
# g_object_process_callback(idx, g_column_array[idx], processing_result[0][idx])

#with nogil:
CallInParallel(
executor_ptr,
call_python_code,
block_count
)


# returns new data holders array
cdef object _set_features_order_data_pd_data_frame(
data_frame,
bool_t has_separate_embedding_features_data,
const TFeaturesLayout* features_layout,
IRawFeaturesOrderDataVisitor* builder_visitor
IRawFeaturesOrderDataVisitor* builder_visitor,
int thread_count
):
cdef TVector[ui32] main_data_feature_idx_to_dst_feature_idx = _get_main_data_feature_idx_to_dst_feature_idx(features_layout, has_separate_embedding_features_data)
cdef TVector[bool_t] is_cat_feature_mask = _get_is_feature_type_mask(features_layout, EFeatureType_Categorical)
Expand Down Expand Up @@ -2724,7 +2797,8 @@ cdef object _set_features_order_data_pd_data_frame(

string_factor_data.reserve(doc_count)

new_data_holders = []
new_data_holders = []

for src_flat_feature_idx, (column_name, column_data) in enumerate(data_frame.items()):
flat_feature_idx = main_data_feature_idx_to_dst_feature_idx[src_flat_feature_idx]
if isinstance(column_data.dtype, pd.SparseDtype):
Expand Down Expand Up @@ -2754,15 +2828,47 @@ cdef object _set_features_order_data_pd_data_frame(
else:
column_values = column_data.to_numpy()
if is_cat_feature_mask[flat_feature_idx]:
string_factor_data.clear()
for doc_idx in range(doc_count):
string_factor_data.clear()
string_factor_data.resize(doc_count)

# cdef get_cat_factor_bytes_representation(
# int non_default_doc_idx, # can be -1 - that means default value for sparse data
# ui32 feature_idx,
# object factor,
# TString* factor_strbuf
#):

def call_back(
int non_default_doc_idx,
object factor,
TString& factor_strbuf
):
get_cat_factor_bytes_representation(
doc_idx,
non_default_doc_idx,
flat_feature_idx,
column_values[doc_idx],
&factor_string
factor,
&factor_strbuf
)
string_factor_data.push_back(factor_string)

parallel_process_features_column_to_vector(
thread_count,
column_values, # column_array,
&string_factor_data, # TVector[TString]* result,
call_back # object object_process_callback,
)

#for doc_idx in range(doc_count):
# call_back(
# doc_idx,
# column_values[doc_idx],
# string_factor_data[doc_idx]
# )
#get_cat_factor_bytes_representation(
# doc_idx,
# flat_feature_idx,
# column_values[doc_idx],
# &string_factor_data[doc_idx]
#)
builder_visitor[0].AddCatFeature(flat_feature_idx, <TConstArrayRef[TString]>string_factor_data)
elif is_text_feature_mask[flat_feature_idx]:
string_factor_data.clear()
Expand Down Expand Up @@ -3821,7 +3927,8 @@ cdef class _PoolBase:
data,
embedding_features_data is not None,
features_layout,
builder_visitor
builder_visitor,
thread_count
)
elif isinstance(data, scipy.sparse.spmatrix):
new_data_holders = _set_features_order_data_scipy_sparse_matrix(
Expand Down
18 changes: 18 additions & 0 deletions catboost/python-package/catboost/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,21 @@ void TrainEvalSplit(
*evalDataProvider = getSubset(postShuffleTestIndices);
}
}

size_t objects_in_column;
size_t column_block_size;
callback_ptr CallbackForColumnProcessing;
TVector<TString>* processing_result;

#include <iostream>

void CallInParallel(NPar::ILocalExecutor* executor, void(*callback) (int block_id), size_t block_count) {
// std::cout << "CallInParallel" << std::endl;
auto task = [&](int id) {
callback(id);
};
executor->ExecRange(task, 0, block_count, NPar::TLocalExecutor::WAIT_COMPLETE);
// for (size_t id = 0; id != block_count; ++id) {
// task(id);
// }
}
8 changes: 8 additions & 0 deletions catboost/python-package/catboost/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,11 @@ void TrainEvalSplit(
);


extern size_t objects_in_column;
extern size_t column_block_size;
extern TVector<TString>* processing_result;

typedef void (*callback_ptr) (int, int);
extern callback_ptr CallbackForColumnProcessing;

void CallInParallel(NPar::ILocalExecutor* executor, void(*callback) (int block_id), size_t block_count);