Skip to content

Commit

Permalink
v0.37.x:state/kvindexer: port 0.34 query fix (#77)
Browse files Browse the repository at this point in the history
* state/kvindexer: associate event attributes with events (#9759)
Co-authored-by: Anca Zamfir <[email protected]>
Co-authored-by: Sergio Mena <[email protected]>
Co-authored-by: Romain Ruetschi <[email protected]>
Co-authored-by: Thane Thomson <[email protected]>

* Backport kvindexer fix

Signed-off-by: Thane Thomson <[email protected]>
Co-authored-by: Callum Waters <[email protected]>
Co-authored-by: Thane Thomson <[email protected]>

* By event search is now default behaviour. Including fixes from PRs added to 0.34

Co-authored-by: Lasaro <[email protected]>
  • Loading branch information
jmalicevic and lasarojc committed Feb 8, 2023
1 parent d1defd9 commit 21c00c5
Show file tree
Hide file tree
Showing 13 changed files with 1,049 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- `[state/kvindexer]` \#77 Fixed the default behaviour of the kvindexer to index and query attributes by events in which they occur. In 0.34.25 this was mitigated by a separated RPC flag. (@jmalicevic)
81 changes: 80 additions & 1 deletion abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,20 @@ type Application struct {
state State
RetainBlocks int64 // blocks to retain after commit (via ResponseCommit.RetainHeight)
txToRemove map[string]struct{}
// If true, the app will generate block events in BeginBlock. Used to test the event indexer
// Should be false by default to avoid generating too much data.
genBlockEvents bool
}

func NewApplication() *Application {
state := loadState(dbm.NewMemDB())
return &Application{state: state}
}

func (app *Application) SetGenBlockEvents() {
app.genBlockEvents = true
}

func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo) {
return types.ResponseInfo{
Data: fmt.Sprintf("{\"size\":%v}", app.state.Size),
Expand Down Expand Up @@ -116,6 +123,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
{Key: "noindex_key", Value: "index is working", Index: false},
},
},
{
Type: "app",
Attributes: []types.EventAttribute{
{Key: "creator", Value: "Cosmoshi", Index: true},
{Key: "key", Value: value, Index: true},
{Key: "index_key", Value: "index is working", Index: true},
{Key: "noindex_key", Value: "index is working", Index: false},
},
},
}

return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events}
Expand Down Expand Up @@ -189,7 +205,70 @@ func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.Respo

func (app *Application) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock {
app.txToRemove = map[string]struct{}{}
return types.ResponseBeginBlock{}
response := types.ResponseBeginBlock{}

if !app.genBlockEvents {
return response
}

if app.state.Height%2 == 0 {
response = types.ResponseBeginBlock{
Events: []types.Event{
{
Type: "begin_event",
Attributes: []types.EventAttribute{
{
Key: "foo",
Value: "100",
Index: true,
},
{
Key: "bar",
Value: "200",
Index: true,
},
},
},
{
Type: "begin_event",
Attributes: []types.EventAttribute{
{
Key: "foo",
Value: "200",
Index: true,
},
{
Key: "bar",
Value: "300",
Index: true,
},
},
},
},
}
} else {
response = types.ResponseBeginBlock{
Events: []types.Event{
{
Type: "begin_event",
Attributes: []types.EventAttribute{
{
Key: "foo",
Value: "400",
Index: true,
},
{
Key: "bar",
Value: "300",
Index: true,
},
},
},
},
}
}

return response
}

func (app *Application) ProcessProposal(
Expand Down
4 changes: 4 additions & 0 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func NewPersistentKVStoreApplication(dbDir string) *PersistentKVStoreApplication
}
}

func (app *PersistentKVStoreApplication) SetGenBlockEvents() {
app.app.genBlockEvents = true
}

func (app *PersistentKVStoreApplication) SetLogger(l log.Logger) {
app.logger = l
}
Expand Down
80 changes: 73 additions & 7 deletions docs/app-dev/indexing-transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ the block itself is never stored.
Each event contains a type and a list of attributes, which are key-value pairs
denoting something about what happened during the method's execution. For more
details on `Events`, see the

[ABCI](https://github.com/cometbft/cometbft/blob/v0.37.x/spec/abci/abci++_basic_concepts.md#events)

documentation.

An `Event` has a composite key associated with it. A `compositeKey` is
Expand All @@ -34,6 +36,9 @@ would be equal to the composite key of `jack.account.number`.
By default, CometBFT will index all transactions by their respective hashes
and height and blocks by their height.

CometBFT allows for different events within the same height to have
equal attributes.

## Configuration

Operators can configure indexing via the `[tx_index]` section. The `indexer`
Expand Down Expand Up @@ -67,6 +72,60 @@ for block and transaction events directly against CometBFT's RPC. However, the
query syntax is limited and so this indexer type might be deprecated or removed
entirely in the future.

**Implementation and data layout**

The kv indexer stores each attribute of an event individually, by creating a composite key
with
- event type,
- attribute key,
- attribute value,
- event generator (e.g. `EndBlock` and `BeginBlock`)
- the height, and
- event counter.
For example the following events:

```
Type: "transfer",
Attributes: []abci.EventAttribute{
{Key: "sender", Value: "Bob", Index: true},
{Key: "recipient", Value: "Alice", Index: true},
{Key: "balance", Value: "100", Index: true},
{Key: "note", Value: "nothing", Index: true},
},
```

```
Type: "transfer",
Attributes: []abci.EventAttribute{
{Key: "sender", Value: "Tom", Index: true},
{Key: "recipient", Value: "Alice", Index: true},
{Key: "balance", Value: "200", Index: true},
{Key: "note", Value: "nothing", Index: true},
},
```

will be represented as follows in the store, assuming these events result from the EndBlock call for height 1:

```
Key value
---- event1 ------
transferSenderBobEndBlock11 1
transferRecipientAliceEndBlock11 1
transferBalance100EndBlock11 1
transferNodeNothingEndblock11 1
---- event2 ------
transferSenderTomEndBlock12 1
transferRecepientAliceEndBlock12 1
transferBalance200EndBlock12 1
transferNodeNothingEndblock12 1
```
The event number is a local variable kept by the indexer and incremented when a new event is processed.
It is an `int64` variable and has no other semantics besides being used to associate attributes belonging to the same events within a height.
This variable is not atomically incremented as event indexing is deterministic. **Should this ever change**, the event id generation
will be broken.

#### PostgreSQL

The `psql` indexer type allows an operator to enable block and transaction event
Expand Down Expand Up @@ -122,10 +181,10 @@ func (app *KVStoreApplication) DeliverTx(req types.RequestDeliverTx) types.Resul
{
Type: "transfer",
Attributes: []abci.EventAttribute{
{Key: []byte("sender"), Value: []byte("Bob"), Index: true},
{Key: []byte("recipient"), Value: []byte("Alice"), Index: true},
{Key: []byte("balance"), Value: []byte("100"), Index: true},
{Key: []byte("note"), Value: []byte("nothing"), Index: true},
{Key: "sender ", Value: "Bob ", Index: true},
{Key: "recipient ", Value: "Alice ", Index: true},
{Key: "balance ", Value: "100 ", Index: true},
{Key: "note ", Value: "nothing ", Index: true},
},
},
}
Expand Down Expand Up @@ -168,7 +227,7 @@ a query to `/subscribe` RPC endpoint.
Check out [API docs](https://docs.cometbft.com/v0.37/rpc/#subscribe) for more information
on query syntax and other options.

## Querying Blocks Events
## Querying Block Events

You can query for a paginated set of blocks by their events by calling the
`/block_search` RPC endpoint:
Expand All @@ -177,5 +236,12 @@ You can query for a paginated set of blocks by their events by calling the
curl "localhost:26657/block_search?query=\"block.height > 10 AND val_set.num_changed > 0\""
```

Check out [API docs](https://docs.cometbft.com/v0.37/rpc/#/Info/block_search)
for more information on query syntax and other options.

Storing the event sequence was introduced in CometBFT 0.34.26. Before that, up until Tendermint Core 0.34.26,
the event sequence was not stored in the kvstore and events were stored only by height. That means that queries
returned blocks and transactions whose event attributes match within the height but can match across different
events on that height.
This behavior was fixed with CometBFT 0.34.26+. However, if the data was indexed with earlier versions of
Tendermint Core and not re-indexed, that data will be queried as if all the attributes within a height
occurred within the same event.

2 changes: 2 additions & 0 deletions rpc/client/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ func TestMain(m *testing.M) {
}

app := kvstore.NewPersistentKVStoreApplication(dir)
// If testing block event generation
// app.SetGenBlockEvents() needs to be called here
node = rpctest.StartTendermint(app)

code := m.Run()
Expand Down
30 changes: 26 additions & 4 deletions rpc/client/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,27 @@ func TestTxSearchWithTimeout(t *testing.T) {
require.Greater(t, len(result.Txs), 0, "expected a lot of transactions")
}

// This test does nothing if we do not call app.SetGenBlockEvents() within main_test.go
// It will nevertheless pass as there are no events being generated.
func TestBlockSearch(t *testing.T) {
c := getHTTPClient()

// first we broadcast a few txs
for i := 0; i < 10; i++ {
_, _, tx := MakeTxKV()

_, err := c.BroadcastTxCommit(context.Background(), tx)
require.NoError(t, err)
}
require.NoError(t, client.WaitForHeight(c, 5, nil))
// This cannot test match_events as it calls the client BlockSearch function directly
// It is the RPC request handler that processes the match_event
result, err := c.BlockSearch(context.Background(), "begin_event.foo = 100 AND begin_event.bar = 300", nil, nil, "asc")
require.NoError(t, err)
blockCount := len(result.Blocks)
require.Equal(t, blockCount, 0)

}
func TestTxSearch(t *testing.T) {
c := getHTTPClient()

Expand All @@ -536,8 +557,7 @@ func TestTxSearch(t *testing.T) {
find := result.Txs[len(result.Txs)-1]
anotherTxHash := types.Tx("a different tx").Hash()

for i, c := range GetClients() {
t.Logf("client %d", i)
for _, c := range GetClients() {

// now we query for the tx.
result, err := c.TxSearch(context.Background(), fmt.Sprintf("tx.hash='%v'", find.Hash), true, nil, nil, "asc")
Expand Down Expand Up @@ -616,16 +636,17 @@ func TestTxSearch(t *testing.T) {
pages = int(math.Ceil(float64(txCount) / float64(perPage)))
)

totalTx := 0
for page := 1; page <= pages; page++ {
page := page
result, err := c.TxSearch(context.Background(), "tx.height >= 1", false, &page, &perPage, "asc")
result, err := c.TxSearch(context.Background(), "tx.height >= 1", true, &page, &perPage, "asc")
require.NoError(t, err)
if page < pages {
require.Len(t, result.Txs, perPage)
} else {
require.LessOrEqual(t, len(result.Txs), perPage)
}
require.Equal(t, txCount, result.TotalCount)
totalTx = totalTx + len(result.Txs)
for _, tx := range result.Txs {
require.False(t, seen[tx.Height],
"Found duplicate height %v in page %v", tx.Height, page)
Expand All @@ -635,6 +656,7 @@ func TestTxSearch(t *testing.T) {
maxHeight = tx.Height
}
}
require.Equal(t, txCount, totalTx)
require.Len(t, seen, txCount)
}
}
Expand Down
Loading

0 comments on commit 21c00c5

Please sign in to comment.