Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid projection more often in DecompressChunk node #6859

Merged
merged 18 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 38 additions & 10 deletions tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
CompressionColumnDescription *column_description = &dcontext->compressed_chunk_columns[i];
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
column_values->arrow = NULL;
const AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno);
const AttrNumber attr = AttrNumberGetAttrOffset(column_description->custom_scan_attno);
column_values->output_value = &compressed_batch_current_tuple(batch_state)->tts_values[attr];
column_values->output_isnull = &compressed_batch_current_tuple(batch_state)->tts_isnull[attr];
const int value_bytes = get_typlen(column_description->typid);
Expand All @@ -179,10 +179,14 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
*/
column_values->decompression_type = DT_Scalar;

*column_values->output_value =
getmissingattr(dcontext->decompressed_slot->tts_tupleDescriptor,
column_description->output_attno,
column_values->output_isnull);
/*
* We might use a custom targetlist-based scan tuple which has no
* default values, so the default values are fetched from the
* uncompressed chunk tuple descriptor.
*/
*column_values->output_value = getmissingattr(dcontext->uncompressed_chunk_tdesc,
column_description->uncompressed_chunk_attno,
column_values->output_isnull);
return;
}

Expand Down Expand Up @@ -399,9 +403,33 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat
for (; column_index < dcontext->num_data_columns; column_index++)
{
column_description = &dcontext->compressed_chunk_columns[column_index];
if (column_description->output_attno == var->varattno)
if (var->varno == INDEX_VAR)
{
break;
/*
* Reference into custom scan tlist, happens when we are using a
* non-default custom scan tuple.
*/
if (column_description->custom_scan_attno == var->varattno)
{
break;
}
}
else
{
/*
* Reference into uncompressed chunk tuple.
*
* Note that this is somewhat redundant, because this branch is
* taken when we do not use a custom scan tuple, and in this case
* the custom scan attno is the same as the uncompressed chunk attno,
* so the above branch would do as well. This difference might
* become relevant in the future, if we stop outputting the
* columns that are needed only for the vectorized quals.
*/
if (column_description->uncompressed_chunk_attno == var->varattno)
{
break;
}
}
}
Ensure(column_index < dcontext->num_data_columns,
Expand Down Expand Up @@ -734,8 +762,8 @@ compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *ba
batch_state->per_batch_context = create_per_batch_mctx(dcontext);
Assert(batch_state->per_batch_context != NULL);

/* Get a reference to the output TupleTableSlot */
TupleTableSlot *decompressed_slot = dcontext->decompressed_slot;
/* Get a reference to the decompressed scan TupleTableSlot */
TupleTableSlot *decompressed_slot = dcontext->custom_scan_slot;

/*
* This code follows Postgres' MakeTupleTableSlot().
Expand Down Expand Up @@ -832,7 +860,7 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
Assert(i < dcontext->num_data_columns);
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
column_values->decompression_type = DT_Scalar;
AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno);
AttrNumber attr = AttrNumberGetAttrOffset(column_description->custom_scan_attno);
Datum *output_value = &decompressed_tuple->tts_values[attr];
bool *output_isnull = &decompressed_tuple->tts_isnull[attr];
column_values->output_value = output_value;
Expand Down
5 changes: 0 additions & 5 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ typedef struct CompressionInfo

} CompressionInfo;

typedef struct ColumnCompressionInfo
{
bool bulk_decompression_possible;
} DecompressChunkColumnCompression;

typedef struct DecompressChunkPath
{
CustomPath custom_path;
Expand Down
24 changes: 19 additions & 5 deletions tsl/src/nodes/decompress_chunk/decompress_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@ typedef struct CompressionColumnDescription
bool by_value;

/*
* Attno of the decompressed column in the output of DecompressChunk node.
* Attno of the decompressed column in the scan tuple of DecompressChunk node.
* Negative values are special columns that do not have a representation in
* the decompressed chunk, but are still used for decompression. They should
* have the respective `type` field.
* the decompressed chunk, but are still used for decompression. The `type`
* field is set accordingly for these columns.
*/
AttrNumber output_attno;
AttrNumber custom_scan_attno;

/*
* Attno of this column in the uncompressed chunks. We use it to fetch the
* default value from the uncompressed chunk tuple descriptor.
*/
AttrNumber uncompressed_chunk_attno;

/*
* Attno of the compressed column in the input compressed chunk scan.
Expand Down Expand Up @@ -76,7 +82,15 @@ typedef struct DecompressContext
*/
MemoryContext bulk_decompression_context;

TupleTableSlot *decompressed_slot;
TupleTableSlot *custom_scan_slot;

/*
* The scan tuple descriptor might be different from the uncompressed chunk
* one, and it doesn't have the default column values in that case, so we
* have to fetch the default values from the uncompressed chunk tuple
* descriptor which we store here.
*/
TupleDesc uncompressed_chunk_tdesc;

PlanState *ps; /* Set for filtering and instrumentation */

Expand Down
38 changes: 27 additions & 11 deletions tsl/src/nodes/decompress_chunk/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,11 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
dcontext->num_columns_with_metadata = num_columns_with_metadata;
dcontext->compressed_chunk_columns =
palloc0(sizeof(CompressionColumnDescription) * num_columns_with_metadata);
dcontext->decompressed_slot = node->ss.ss_ScanTupleSlot;
dcontext->custom_scan_slot = node->ss.ss_ScanTupleSlot;
dcontext->uncompressed_chunk_tdesc = RelationGetDescr(node->ss.ss_currentRelation);
dcontext->ps = &node->ss.ps;

TupleDesc desc = dcontext->decompressed_slot->tts_tupleDescriptor;
TupleDesc desc = dcontext->custom_scan_slot->tts_tupleDescriptor;

/*
* Compressed columns go in front, and the rest go to the back, so we have
Expand All @@ -276,22 +277,22 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
{
CompressionColumnDescription column = {
.compressed_scan_attno = AttrOffsetGetAttrNumber(compressed_index),
.output_attno = list_nth_int(chunk_state->decompression_map, compressed_index),
.custom_scan_attno = list_nth_int(chunk_state->decompression_map, compressed_index),
.bulk_decompression_supported =
list_nth_int(chunk_state->bulk_decompression_column, compressed_index)
};

if (column.output_attno == 0)
if (column.custom_scan_attno == 0)
{
/* We are asked not to decompress this column, skip it. */
continue;
}

if (column.output_attno > 0)
if (column.custom_scan_attno > 0)
{
/* normal column that is also present in decompressed chunk */
Form_pg_attribute attribute =
TupleDescAttr(desc, AttrNumberGetAttrOffset(column.output_attno));
TupleDescAttr(desc, AttrNumberGetAttrOffset(column.custom_scan_attno));

column.typid = attribute->atttypid;
get_typlenbyval(column.typid, &column.value_bytes, &column.by_value);
Expand All @@ -300,11 +301,26 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
column.type = SEGMENTBY_COLUMN;
else
column.type = COMPRESSED_COLUMN;

if (cscan->custom_scan_tlist == NIL)
{
column.uncompressed_chunk_attno = column.custom_scan_attno;
}
else
{
Var *var =
castNode(Var,
castNode(TargetEntry,
list_nth(cscan->custom_scan_tlist,
AttrNumberGetAttrOffset(column.custom_scan_attno)))
->expr);
column.uncompressed_chunk_attno = var->varattno;
}
}
else
{
/* metadata columns */
switch (column.output_attno)
switch (column.custom_scan_attno)
{
case DECOMPRESS_CHUNK_COUNT_ID:
column.type = COUNT_COLUMN;
Expand All @@ -313,15 +329,15 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
column.type = SEQUENCE_NUM_COLUMN;
break;
default:
elog(ERROR, "Invalid column attno \"%d\"", column.output_attno);
elog(ERROR, "Invalid column attno \"%d\"", column.custom_scan_attno);
break;
}
}

if (column.output_attno > 0)
if (column.custom_scan_attno > 0)
{
/* Data column. */
Assert(current_compressed < num_columns_with_metadata);
Assert(current_compressed < num_data_columns);
dcontext->compressed_chunk_columns[current_compressed++] = column;
}
else
Expand All @@ -344,7 +360,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
chunk_state->batch_queue =
batch_queue_heap_create(num_data_columns,
chunk_state->sortinfo,
dcontext->decompressed_slot->tts_tupleDescriptor,
dcontext->custom_scan_slot->tts_tupleDescriptor,
&BatchQueueFunctionsHeap);
chunk_state->exec_methods.ExecCustomScan = decompress_chunk_exec_heap;
}
Expand Down