Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge property buckets #4729

Draft
wants to merge 71 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
819799b
Merge buckets
donomii Oct 4, 2023
863a9ec
Regenerate reformat
donomii Oct 4, 2023
2283acf
Merge buckets
donomii Oct 4, 2023
f21e635
Regenerate reformat
donomii Oct 4, 2023
9ed7033
Merge branch 'more-merge-buckets-rebase' of https://github.com/weavia…
donomii Oct 5, 2023
2a2e232
Fixed spurios error messages
donomii Oct 23, 2023
f2f0838
Merge branch 'main' into more-merge-buckets-rebase-main-merge-2
donomii Apr 19, 2024
a9ba776
Merge shard.go
donomii Apr 19, 2024
9fc2f68
Fix more
donomii Apr 19, 2024
c1bd143
Revert "refact: thread safety for bucket creation and loading (#4422)"
donomii Apr 19, 2024
f685283
Moar
donomii Apr 19, 2024
74b5c1e
Fix merge
donomii Apr 19, 2024
fa4d272
fix moar
donomii Apr 19, 2024
7e2f03d
.
donomii Apr 19, 2024
fd1da59
.
donomii Apr 19, 2024
69888c4
.
donomii Apr 19, 2024
89b81f8
.
donomii Apr 19, 2024
577e074
.
donomii Apr 19, 2024
7f4355d
.
donomii Apr 19, 2024
b987a36
.
donomii Apr 19, 2024
9a60665
migrator
donomii Apr 19, 2024
2a1720d
.
donomii Apr 19, 2024
1a68937
.
donomii Apr 19, 2024
6315376
.
donomii Apr 19, 2024
c7cbbc1
.
donomii Apr 21, 2024
3293cbf
Merge branch 'main' into more-merge-buckets-rebase-main-merge-2
donomii Apr 21, 2024
3b1dd35
Regenerate reformat
donomii Apr 21, 2024
ded6ee5
.
donomii Apr 21, 2024
e8ebe2d
Regenerate reformat
donomii Apr 21, 2024
71be6f2
Regenerate reformat
donomii Apr 21, 2024
81ad61c
Regenerate reformat
donomii Apr 21, 2024
5e5ff58
Regenerate reformat
donomii Apr 21, 2024
0600cc7
.
donomii Apr 21, 2024
c643fe2
Regenerate reformat
donomii Apr 21, 2024
61d8fa5
.
donomii Apr 21, 2024
2af6d8b
Merge branch 'main' into more-merge-buckets-rebase-main-merge-2
donomii Apr 22, 2024
0ffc703
.
donomii Apr 22, 2024
4d320e4
.
donomii Apr 22, 2024
5f1e46a
.
donomii Apr 22, 2024
9aabdb1
Merge branch 'main' into more-merge-buckets-rebase-main-merge-2
donomii Apr 22, 2024
5ae3d8f
.
donomii Apr 22, 2024
d781721
Merge branch 'more-merge-buckets-rebase-main-merge-2' of github.com:w…
donomii Apr 22, 2024
0b75b8d
.
donomii Apr 22, 2024
b037ac9
.
donomii Apr 22, 2024
722bdea
Regenerate reformat
donomii Apr 22, 2024
63fdd2c
Regenerate reformat
donomii Apr 22, 2024
6963dd8
Merge branch 'main' into more-merge-buckets-rebase-main-merge-2
donomii Apr 22, 2024
be8bba8
.
donomii Apr 22, 2024
22b79a5
Regenerate reformat
donomii Apr 22, 2024
17d3a91
Fix more tests
donomii Apr 29, 2024
4defd4c
Merge branch 'main' into more-merge-buckets-rebase-main-merge-2
donomii May 2, 2024
0a777c7
Merge branch 'main' into more-merge-buckets-rebase-main-merge-2
donomii May 2, 2024
e477f3e
Merge branch 'main' into more-merge-buckets-rebase-main-merge-2
donomii May 3, 2024
c60c340
Fix again
donomii May 3, 2024
8faebd2
Remove debugging
donomii May 3, 2024
c00ab9f
Use var for go test
donomii May 3, 2024
b943a51
Add new cursors. Rename helpers, add prefix/postfix funcs
donomii May 3, 2024
4889532
.
donomii May 4, 2024
98938ed
.
donomii May 4, 2024
7b2c868
.
donomii May 4, 2024
b20d7c2
.
donomii May 4, 2024
0c08895
.
donomii May 4, 2024
0f293c8
.
donomii May 4, 2024
437b9d2
.
donomii May 5, 2024
6411d27
.
donomii May 5, 2024
a99b25d
.
donomii May 5, 2024
93af236
.
donomii May 7, 2024
6d0dc58
Merge branch 'main' into more-merge-buckets-rebase-main-merge-2
donomii May 17, 2024
9def0c4
Regenerate reformat
donomii May 17, 2024
e336824
Fix init for old buckets code
donomii May 17, 2024
5f3be80
Merge branch 'main' into more-merge-buckets-rebase-main-merge-2
donomii May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions adapters/handlers/rest/configure_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/weaviate/weaviate/adapters/repos/classifications"
"github.com/weaviate/weaviate/adapters/repos/db"
"github.com/weaviate/weaviate/adapters/repos/db/inverted"
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv"
modulestorage "github.com/weaviate/weaviate/adapters/repos/modules"
schemarepo "github.com/weaviate/weaviate/adapters/repos/schema"
rCluster "github.com/weaviate/weaviate/cluster"
Expand Down Expand Up @@ -137,9 +138,18 @@ func getCores() (int, error) {
}

func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *state.State {
if os.Getenv("ENABLE_MERGED_PROPERTY_BUCKETS") != "" && strings.ToLower(os.Getenv("ENABLE_MERGED_PROPERTY_BUCKETS")) != "false" {
lsmkv.FeatureUseMergedBuckets = true
} else {
lsmkv.FeatureUseMergedBuckets = false
}
appState := startupRoutine(ctx, options)
setupGoProfiling(appState.ServerConfig.Config, appState.Logger)

if lsmkv.FeatureUseMergedBuckets {
appState.Logger.WithField("action", "startup").Warnf("Merged property buckets are enabled. This is an experimental feature and may be removed in the future. Please report any issues you encounter.")
}

if appState.ServerConfig.Config.Monitoring.Enabled {
appState.TenantActivity = tenantactivity.NewHandler()

Expand Down
12 changes: 12 additions & 0 deletions adapters/repos/db/aggregations_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,25 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/adapters/repos/db/helpers"
"github.com/weaviate/weaviate/entities/aggregation"
"github.com/weaviate/weaviate/entities/filters"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/usecases/memwatch"
)

func Test_KeyHelpers(t *testing.T) {
prefix := []byte("testprefix")
data := []byte("testdata")

composite_key := helpers.MakePropertyKey(prefix, data)
assert.True(t, helpers.MatchesPropertyKey(prefix, composite_key))
recovered_key := helpers.UnMakePropertyKey(prefix, composite_key)

assert.Equal(t, data, recovered_key)
}

func Test_Aggregations(t *testing.T) {
dirName := t.TempDir()

Expand Down
9 changes: 7 additions & 2 deletions adapters/repos/db/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/weaviate/weaviate/adapters/repos/db/helpers"
"github.com/weaviate/weaviate/adapters/repos/db/inverted"
"github.com/weaviate/weaviate/adapters/repos/db/inverted/stopwords"
"github.com/weaviate/weaviate/adapters/repos/db/inverted/tracker"
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv"
"github.com/weaviate/weaviate/adapters/repos/db/roaringset"
"github.com/weaviate/weaviate/entities/aggregation"
Expand All @@ -43,10 +44,13 @@ type Aggregator struct {
vectorIndex vectorIndex
stopwords stopwords.StopwordDetector
shardVersion uint16
propLenTracker *inverted.JsonPropertyLengthTracker
propLengths *inverted.JsonPropertyLengthTracker
propertyIds *tracker.JsonPropertyIdTracker
isFallbackToSearchable inverted.IsFallbackToSearchable
tenant string
nestedCrossRefLimit int64
PropertyName string
propLenTracker *inverted.JsonPropertyLengthTracker
bitmapFactory *roaringset.BitmapFactory
modules *modules.Provider
}
Expand All @@ -57,7 +61,7 @@ func New(store *lsmkv.Store, params aggregation.Params,
vectorIndex vectorIndex, logger logrus.FieldLogger,
propLenTracker *inverted.JsonPropertyLengthTracker,
isFallbackToSearchable inverted.IsFallbackToSearchable,
tenant string, nestedCrossRefLimit int64,
tenant string, propertyIds *tracker.JsonPropertyIdTracker, nestedCrossRefLimit int64,
bitmapFactory *roaringset.BitmapFactory,
modules *modules.Provider,
) *Aggregator {
Expand All @@ -73,6 +77,7 @@ func New(store *lsmkv.Store, params aggregation.Params,
propLenTracker: propLenTracker,
isFallbackToSearchable: isFallbackToSearchable,
tenant: tenant,
propertyIds: propertyIds,
nestedCrossRefLimit: nestedCrossRefLimit,
bitmapFactory: bitmapFactory,
modules: modules,
Expand Down
2 changes: 1 addition & 1 deletion adapters/repos/db/aggregator/filtered.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (fa *filteredAggregator) bm25Objects(ctx context.Context, kw *searchparams.

objs, scores, err := inverted.NewBM25Searcher(cfg.BM25, fa.store, fa.getSchema.ReadOnlyClass,
propertyspecific.Indices{}, fa.classSearcher,
fa.GetPropertyLengthTracker(), fa.logger, fa.shardVersion,
fa.GetPropertyLengthTracker(), fa.logger, fa.shardVersion, fa.propertyIds,
).BM25F(ctx, nil, fa.params.ClassName, *fa.params.ObjectLimit, *kw)
if err != nil {
return nil, nil, fmt.Errorf("bm25 objects: %w", err)
Expand Down
10 changes: 7 additions & 3 deletions adapters/repos/db/aggregator/grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,16 @@ func (g *grouper) hybrid(ctx context.Context, allowList helpers.AllowList, modul
return ids, nil
}

func (g *grouper) addElementById(s *models.PropertySchema, docID uint64) error {
if s == nil {
func (g *grouper) addElementById(propertySchemaP *models.PropertySchema, docID uint64) error {
if propertySchemaP == nil {
return nil
}

item, ok := (*s).(map[string]interface{})[g.params.GroupBy.Property.String()]
key := g.params.GroupBy.Property.String()
S := *propertySchemaP
m := S.(map[string]interface{})
item, ok := m[key]

if !ok {
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions adapters/repos/db/aggregator/hybrid.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ func (a *Aggregator) bm25Objects(ctx context.Context, kw *searchparams.KeywordRa

objs, dists, err := inverted.NewBM25Searcher(cfg.BM25, a.store, a.getSchema.ReadOnlyClass,
propertyspecific.Indices{}, a.classSearcher,
a.GetPropertyLengthTracker(), a.logger, a.shardVersion,
).BM25F(ctx, nil, a.params.ClassName, *a.params.ObjectLimit, *kw)
a.GetPropertyLengthTracker(), a.logger, a.shardVersion, a.propertyIds).BM25F(ctx, nil, a.params.ClassName, *a.params.ObjectLimit, *kw)
if err != nil {
return nil, nil, fmt.Errorf("bm25 objects: %w", err)
}
Expand Down
97 changes: 68 additions & 29 deletions adapters/repos/db/aggregator/unfiltered_type_specific.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func (ua unfilteredAggregator) boolProperty(ctx context.Context,
Type: aggregation.PropertyTypeBoolean,
}

b := ua.store.Bucket(helpers.BucketFromPropNameLSM(prop.Name.String()))
if b == nil {
return nil, errors.Errorf("could not find bucket for prop %s", prop.Name)
b, err := lsmkv.FetchMeABucket(ua.store, "filterable_properties", helpers.BucketFromPropertyNameLSM(prop.Name.String()), prop.Name.String(), ua.propertyIds)
if err != nil {
return nil, errors.Errorf("could not create proxy bucket for prop %s: %v", prop.Name, err)
}

agg := newBoolAggregator()
Expand All @@ -44,6 +44,10 @@ func (ua unfilteredAggregator) boolProperty(ctx context.Context,
defer c.Close()

for k, v := c.First(); k != nil; k, v = c.Next() {
if !helpers.MatchesPropertyKey(b.PropertyPrefix(), k) {
continue
}
k = helpers.UnMakePropertyKey(b.PropertyPrefix(), k)
err := ua.parseAndAddBoolRowRoaringSet(agg, k, v)
if err != nil {
return nil, err
Expand All @@ -54,6 +58,10 @@ func (ua unfilteredAggregator) boolProperty(ctx context.Context,
defer c.Close()

for k, v := c.First(); k != nil; k, v = c.Next() {
if !helpers.MatchesPropertyKey(b.PropertyPrefix(), k) {
continue
}
k = helpers.UnMakePropertyKey(b.PropertyPrefix(), k)
err := ua.parseAndAddBoolRowSet(agg, k, v)
if err != nil {
return nil, err
Expand Down Expand Up @@ -84,6 +92,10 @@ func (ua unfilteredAggregator) boolArrayProperty(ctx context.Context,
defer c.Close()

for k, v := c.First(); k != nil; k, v = c.Next() {
if !helpers.MatchesPropertyKey(b.PropertyPrefix(), k) {
continue
}

err := ua.parseAndAddBoolArrayRow(agg, v, prop.Name)
if err != nil {
return nil, err
Expand All @@ -98,8 +110,7 @@ func (ua unfilteredAggregator) boolArrayProperty(ctx context.Context,
func (ua unfilteredAggregator) parseAndAddBoolRowSet(agg *boolAggregator, k []byte, v [][]byte) error {
if len(k) != 1 {
// we expect to see a single byte for a marshalled bool
return fmt.Errorf("unexpected key length on inverted index, "+
"expected 1: got %d", len(k))
return fmt.Errorf("parseAndAddBoolRowSet: unexpected key length on inverted index, expected 1: got %d", len(k))
}

if err := agg.AddBoolRow(k, uint64(len(v))); err != nil {
Expand All @@ -112,8 +123,7 @@ func (ua unfilteredAggregator) parseAndAddBoolRowSet(agg *boolAggregator, k []by
func (ua unfilteredAggregator) parseAndAddBoolRowRoaringSet(agg *boolAggregator, k []byte, v *sroar.Bitmap) error {
if len(k) != 1 {
// we expect to see a single byte for a marshalled bool
return fmt.Errorf("unexpected key length on inverted index, "+
"expected 1: got %d", len(k))
return fmt.Errorf("parseAndAddBoolRowRoaringSet: unexpected key length on inverted index, expected 1: got %d", len(k))
}

if err := agg.AddBoolRow(k, uint64(v.GetCardinality())); err != nil {
Expand Down Expand Up @@ -152,9 +162,9 @@ func (ua unfilteredAggregator) floatProperty(ctx context.Context,
NumericalAggregations: map[string]interface{}{},
}

b := ua.store.Bucket(helpers.BucketFromPropNameLSM(prop.Name.String()))
if b == nil {
return nil, errors.Errorf("could not find bucket for prop %s", prop.Name)
b, err := lsmkv.FetchMeABucket(ua.store, "filterable_properties", helpers.BucketFromPropertyNameLSM(prop.Name.String()), prop.Name.String(), ua.propertyIds)
if err != nil {
return nil, errors.Errorf("could not create proxy bucket for prop %s: %v", prop.Name, err)
}

agg := newNumericalAggregator()
Expand All @@ -165,6 +175,10 @@ func (ua unfilteredAggregator) floatProperty(ctx context.Context,
defer c.Close()

for k, v := c.First(); k != nil; k, v = c.Next() {
if !helpers.MatchesPropertyKey(b.PropertyPrefix(), k) {
continue
}
k = helpers.UnMakePropertyKey(b.PropertyPrefix(), k)
if err := ua.parseAndAddFloatRowRoaringSet(agg, k, v); err != nil {
return nil, err
}
Expand All @@ -174,6 +188,10 @@ func (ua unfilteredAggregator) floatProperty(ctx context.Context,
defer c.Close()

for k, v := c.First(); k != nil; k, v = c.Next() {
if !helpers.MatchesPropertyKey(b.PropertyPrefix(), k) {
continue
}
k = helpers.UnMakePropertyKey(b.PropertyPrefix(), k)
if err := ua.parseAndAddFloatRowSet(agg, k, v); err != nil {
return nil, err
}
Expand All @@ -193,9 +211,9 @@ func (ua unfilteredAggregator) intProperty(ctx context.Context,
NumericalAggregations: map[string]interface{}{},
}

b := ua.store.Bucket(helpers.BucketFromPropNameLSM(prop.Name.String()))
if b == nil {
return nil, errors.Errorf("could not find bucket for prop %s", prop.Name)
b, err := lsmkv.FetchMeABucket(ua.store, "filterable_properties", helpers.BucketFromPropertyNameLSM(prop.Name.String()), prop.Name.String(), ua.propertyIds)
if err != nil {
return nil, errors.Errorf("could not create proxy bucket for prop %s: %v", prop.Name, err)
}

agg := newNumericalAggregator()
Expand All @@ -206,6 +224,10 @@ func (ua unfilteredAggregator) intProperty(ctx context.Context,
defer c.Close()

for k, v := c.First(); k != nil; k, v = c.Next() {
if !helpers.MatchesPropertyKey(b.PropertyPrefix(), k) {
continue
}
k = helpers.UnMakePropertyKey(b.PropertyPrefix(), k)
if err := ua.parseAndAddIntRowRoaringSet(agg, k, v); err != nil {
return nil, err
}
Expand All @@ -216,6 +238,10 @@ func (ua unfilteredAggregator) intProperty(ctx context.Context,
defer c.Close()

for k, v := c.First(); k != nil; k, v = c.Next() {
if !helpers.MatchesPropertyKey(b.PropertyPrefix(), k) {
continue
}
k = helpers.UnMakePropertyKey(b.PropertyPrefix(), k)
if err := ua.parseAndAddIntRowSet(agg, k, v); err != nil {
return nil, err
}
Expand All @@ -235,9 +261,9 @@ func (ua unfilteredAggregator) dateProperty(ctx context.Context,
DateAggregations: map[string]interface{}{},
}

b := ua.store.Bucket(helpers.BucketFromPropNameLSM(prop.Name.String()))
if b == nil {
return nil, errors.Errorf("could not find bucket for prop %s", prop.Name)
b, err := lsmkv.FetchMeABucket(ua.store, "filterable_properties", helpers.BucketFromPropertyNameLSM(prop.Name.String()), prop.Name.String(), ua.propertyIds)
if err != nil {
return nil, errors.Errorf("could not create proxy bucket for prop %s: %v", prop.Name, err)
}

agg := newDateAggregator()
Expand All @@ -247,7 +273,11 @@ func (ua unfilteredAggregator) dateProperty(ctx context.Context,
c := b.CursorRoaringSet()
defer c.Close()

for k, v := c.First(); k != nil; k, v = c.Next() {
for key, v := c.First(); key != nil; key, v = c.Next() {
if !helpers.MatchesPropertyKey(b.PropertyPrefix(), key) {
continue
}
k := helpers.UnMakePropertyKey(b.PropertyPrefix(), key)
if err := ua.parseAndAddDateRowRoaringSet(agg, k, v); err != nil {
return nil, err
}
Expand All @@ -257,6 +287,10 @@ func (ua unfilteredAggregator) dateProperty(ctx context.Context,
defer c.Close()

for k, v := c.First(); k != nil; k, v = c.Next() {
if !helpers.MatchesPropertyKey(b.PropertyPrefix(), k) {
continue
}
k = helpers.UnMakePropertyKey(b.PropertyPrefix(), k)
if err := ua.parseAndAddDateRowSet(agg, k, v); err != nil {
return nil, err
}
Expand All @@ -273,8 +307,7 @@ func (ua unfilteredAggregator) parseAndAddDateRowSet(agg *dateAggregator, k []by
) error {
if len(k) != 8 {
// dates are stored as epoch nanoseconds, we expect to see an int64
return fmt.Errorf("unexpected key length on inverted index, "+
"expected 8: got %d", len(k))
return fmt.Errorf("parseAndAddDateRowSet: unexpected key length on inverted index, expected 8: got %d", len(k))
}

if err := agg.AddTimestampRow(k, uint64(len(v))); err != nil {
Expand All @@ -289,8 +322,7 @@ func (ua unfilteredAggregator) parseAndAddDateRowRoaringSet(agg *dateAggregator,
) error {
if len(k) != 8 {
// dates are stored as epoch nanoseconds, we expect to see an int64
return fmt.Errorf("unexpected key length on inverted index, "+
"expected 8: got %d", len(k))
return fmt.Errorf("parseAndAddDateRowRoaringSet: unexpected key length on inverted index, expected 8: got %d (%v,%v)", len(k), k, string(k))
}

if err := agg.AddTimestampRow(k, uint64(v.GetCardinality())); err != nil {
Expand Down Expand Up @@ -319,6 +351,9 @@ func (ua unfilteredAggregator) dateArrayProperty(ctx context.Context,
defer c.Close()

for k, v := c.First(); k != nil; k, v = c.Next() {
if !helpers.MatchesPropertyKey(b.PropertyPrefix(), k) {
continue
}
if err := ua.parseAndAddDateArrayRow(agg, v, prop.Name); err != nil {
return nil, err
}
Expand Down Expand Up @@ -356,8 +391,7 @@ func (ua unfilteredAggregator) parseAndAddFloatRowSet(agg *numericalAggregator,
if len(k) != 8 {
// we expect to see either an int64 or a float64, so any non-8 length
// is unexpected
return fmt.Errorf("unexpected key length on inverted index, "+
"expected 8: got %d", len(k))
return fmt.Errorf("parseAndAddFloatRowSet: unexpected key length on inverted index, expected 8: got %d", len(k))
}

if err := agg.AddFloat64Row(k, uint64(len(v))); err != nil {
Expand All @@ -373,8 +407,7 @@ func (ua unfilteredAggregator) parseAndAddFloatRowRoaringSet(agg *numericalAggre
if len(k) != 8 {
// we expect to see either an int64 or a float64, so any non-8 length
// is unexpected
return fmt.Errorf("unexpected key length on inverted index, "+
"expected 8: got %d", len(k))
return fmt.Errorf("parseAndAddFloatRowRoaringSet: unexpected key length on inverted index, expected 8: got %d", len(k))
}

if err := agg.AddFloat64Row(k, uint64(v.GetCardinality())); err != nil {
Expand All @@ -390,8 +423,7 @@ func (ua unfilteredAggregator) parseAndAddIntRowSet(agg *numericalAggregator, k
if len(k) != 8 {
// we expect to see either an int64 or a float64, so any non-8 length
// is unexpected
return fmt.Errorf("unexpected key length on inverted index, "+
"expected 8: got %d", len(k))
return fmt.Errorf("parseAndAddIntRowSet: unexpected key length on inverted index, expected 8: got %d", len(k))
}

if err := agg.AddInt64Row(k, uint64(len(v))); err != nil {
Expand All @@ -407,8 +439,7 @@ func (ua unfilteredAggregator) parseAndAddIntRowRoaringSet(agg *numericalAggrega
if len(k) != 8 {
// we expect to see either an int64 or a float64, so any non-8 length
// is unexpected
return fmt.Errorf("unexpected key length on inverted index, "+
"expected 8: got %d", len(k))
return fmt.Errorf("parseAndAddIntRowRoaringSet: unexpected key length on inverted index, expected 8: got %d", len(k))
}

if err := agg.AddInt64Row(k, uint64(v.GetCardinality())); err != nil {
Expand Down Expand Up @@ -463,6 +494,10 @@ func (ua unfilteredAggregator) textProperty(ctx context.Context,
defer c.Close()

for k, v := c.First(); k != nil; k, v = c.Next() {
if !helpers.MatchesPropertyKey(b.PropertyPrefix(), k) {
continue
}

if err := ua.parseAndAddTextRow(agg, v, prop.Name); err != nil {
return nil, err
}
Expand Down Expand Up @@ -492,6 +527,10 @@ func (ua unfilteredAggregator) numberArrayProperty(ctx context.Context,
defer c.Close()

for k, v := c.First(); k != nil; k, v = c.Next() {
if !helpers.MatchesPropertyKey(b.PropertyPrefix(), k) {
continue
}

if err := ua.parseAndAddNumberArrayRow(agg, v, prop.Name); err != nil {
return nil, err
}
Expand Down