-
Notifications
You must be signed in to change notification settings - Fork 475
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sharded compaction #3537
base: main
Are you sure you want to change the base?
Sharded compaction #3537
Conversation
…Remove compaction level of all grouping, remove active window. Comments
@@ -75,6 +75,7 @@ func (i *rawIterator) Next(context.Context) (common.ID, parquet.Row, error) { | |||
} | |||
|
|||
if errors.Is(err, io.EOF) { | |||
i.pool.Put(rows[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes a "memory leak" where the buffer wasn't returned to the pool. This actually has a significant effect on memory because this isn't happening once at the end, but on every loop of the multiblock iterator. Once 1 block is exhausted it begins leaking a pooled buffer on every call. The better fix is to make the multiblock iterator stop calling Next() once an iter is exhausted, but it was going to be a lot more involved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding some information to the configuration docs. Will we need additional information in the docs that talks about when to use this ocnfiguration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
long overdue work. looks great. added some Qs but fine with things as is.
if you need my approval please ask. the only reason i'm not approving is b/c you and @mapno have been handling these PRs
// ship block to backend if done | ||
if currentBlock != nil && cmd.CutBlock(currentBlock.meta, lowestID) { | ||
currentBlockPtrCopy := currentBlock | ||
currentBlockPtrCopy.meta.StartTime = minBlockStart |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't this adjust the original "currentBlock" since they point to the same thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm this block was moved from the bottom of the loop to the top so that the CutBlock
callback can inspect the next trace (lowestID
). You're right it does seem to be out of date and the copy is no longer needed. I'll take a look.
Hi everyone: Putting this back to draft because it is not yet ready for large scale. Testing it out in a large cluster is showing it difficult to achieve the balance between splitting and combining blocks, even with tuning. Would like to review the switch statement and priorities of each group of blocks. When there is an imbalance the result is that compactors are less effective and the blocklist increases more than acceptable: i.e. If the compactor spends too much time splitting blocks then the backend is composed of (too) many small blocks, and if the compactor spends too much combining, then ingester level-0 blocks are neglected. |
What this PR does:
This is the largest change to compaction in years. It is a new sharding compactor that splits blocks up by trace ID divisions. For example: instead of having 2 blocks with the entire range of trace IDs 00..FF, the sharding compactor can combine and split them into 2 blocks with the ranges 00..80 and 80..FF where each block contains half of the range it did before.
There are several benefits from this approach:
The shard count is configurable globally and per-tenant. Setting to 2 or higher activates the new sharding compactor. 0 or 1 will fallback to the existing compactor. Expecting useful values to be 2 through 8. Values higher than 8 could be interesting, but at the cost of greatly increasing the size of the blocklist.
Description
How the sharding compactor works can be considered as 3 phases.
CompactionLevel=0
and will split them. This is a combination of combine/split like described at the time, so that up to 4 (maxInputBlocks) ingester blocks will be rewritten as N sharded blocks. This step feeds work to the next step:CompactionLevel>0
and the block min/max trace IDs fall within the same shard (i.e. the block is "well-sharded")Which issue(s) this PR fixes:
Fixes n/a
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]