Skip to content

Commit

Permalink
e2e: test runner generates loadtime formatted transactions. (#9779)
Browse files Browse the repository at this point in the history
  • Loading branch information
williambanfield committed Nov 30, 2022
1 parent 18d38dd commit 21b2801
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 78 deletions.
1 change: 0 additions & 1 deletion test/e2e/networks/simple.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@
[node.validator02]
[node.validator03]
[node.validator04]

9 changes: 9 additions & 0 deletions test/e2e/pkg/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ type Manifest struct {
ProcessProposalDelay time.Duration `toml:"process_proposal_delay"`
CheckTxDelay time.Duration `toml:"check_tx_delay"`
// TODO: add vote extension and finalize block delay (@cmwaters)

LoadTxSizeBytes int `toml:"load_tx_size_bytes"`
LoadTxBatchSize int `toml:"load_tx_batch_size"`
LoadTxConnections int `toml:"load_tx_connections"`
}

// ManifestNode represents a node in a testnet manifest.
Expand Down Expand Up @@ -145,6 +149,11 @@ type ManifestNode struct {
// pause: temporarily pauses (freezes) the node
// restart: restarts the node, shutting it down with SIGTERM
Perturb []string `toml:"perturb"`

// SendNoLoad determines if the e2e test should send load to this node.
// It defaults to false so unless the configured, the node will
// receive load.
SendNoLoad bool `toml:"send_no_laod"`

This comment has been minimized.

Copy link
@thanethomson

thanethomson Dec 1, 2022

Contributor

I just realized there's a spelling mistake here: should be send_no_load

}

// Save saves the testnet manifest to a file.
Expand Down
23 changes: 23 additions & 0 deletions test/e2e/pkg/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
const (
randomSeed int64 = 2308084734268
proxyPortFirst uint32 = 5701

defaultBatchSize = 2
defaultConnections = 1
defaultTxSizeBytes = 1024
)

type (
Expand Down Expand Up @@ -63,6 +67,9 @@ type Testnet struct {
Nodes []*Node
KeyType string
Evidence int
LoadTxSizeBytes int
LoadTxBatchSize int
LoadTxConnections int
ABCIProtocol string
PrepareProposalDelay time.Duration
ProcessProposalDelay time.Duration
Expand Down Expand Up @@ -92,6 +99,9 @@ type Node struct {
Seeds []*Node
PersistentPeers []*Node
Perturbations []Perturbation

// SendNoLoad determines if the e2e test should send load to this node.
SendNoLoad bool
}

// LoadTestnet loads a testnet from a manifest file, using the filename to
Expand Down Expand Up @@ -119,6 +129,9 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test
ValidatorUpdates: map[int64]map[*Node]int64{},
Nodes: []*Node{},
Evidence: manifest.Evidence,
LoadTxSizeBytes: manifest.LoadTxSizeBytes,
LoadTxBatchSize: manifest.LoadTxBatchSize,
LoadTxConnections: manifest.LoadTxConnections,
ABCIProtocol: manifest.ABCIProtocol,
PrepareProposalDelay: manifest.PrepareProposalDelay,
ProcessProposalDelay: manifest.ProcessProposalDelay,
Expand All @@ -133,6 +146,15 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test
if testnet.ABCIProtocol == "" {
testnet.ABCIProtocol = string(ProtocolBuiltin)
}
if testnet.LoadTxConnections == 0 {
testnet.LoadTxConnections = defaultConnections
}
if testnet.LoadTxBatchSize == 0 {
testnet.LoadTxBatchSize = defaultBatchSize
}
if testnet.LoadTxSizeBytes == 0 {
testnet.LoadTxSizeBytes = defaultTxSizeBytes
}

// Set up nodes, in alphabetical order (IPs and ports get same order).
nodeNames := []string{}
Expand Down Expand Up @@ -167,6 +189,7 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test
SnapshotInterval: nodeManifest.SnapshotInterval,
RetainBlocks: nodeManifest.RetainBlocks,
Perturbations: []Perturbation{},
SendNoLoad: nodeManifest.SendNoLoad,
}
if node.StartAt == testnet.InitialHeight {
node.StartAt = 0 // normalize to 0 for initial nodes, since code expects this
Expand Down
137 changes: 74 additions & 63 deletions test/e2e/runner/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,48 @@ package main

import (
"context"
"crypto/rand"
"errors"
"fmt"
"math"
"sync"
"time"

"github.com/google/uuid"
"github.com/tendermint/tendermint/libs/log"
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/test/loadtime/payload"
"github.com/tendermint/tendermint/types"
)

const workerPoolSize = 16

// Load generates transactions against the network until the given context is
// canceled. A multiplier of greater than one can be supplied if load needs to
// be generated beyond a minimum amount.
func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
// Since transactions are executed across all nodes in the network, we need
// to reduce transaction load for larger networks to avoid using too much
// CPU. This gives high-throughput small networks and low-throughput large ones.
// This also limits the number of TCP connections, since each worker has
// a connection to all nodes.
concurrency := 64 / len(testnet.Nodes)
if concurrency == 0 {
concurrency = 1
}
// canceled.
func Load(ctx context.Context, testnet *e2e.Testnet) error {
initialTimeout := 1 * time.Minute
stallTimeout := 30 * time.Second

chTx := make(chan types.Tx)
chSuccess := make(chan types.Tx)
chSuccess := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Spawn job generator and processors.
logger.Info("load", "msg", log.NewLazySprintf("Starting transaction load (%v workers)...", concurrency))
started := time.Now()
u := [16]byte(uuid.New()) // generate run ID on startup

go loadGenerate(ctx, chTx, multiplier)
txCh := make(chan types.Tx)
go loadGenerate(ctx, txCh, testnet, u[:])

for w := 0; w < concurrency; w++ {
go loadProcess(ctx, testnet, chTx, chSuccess)
for _, n := range testnet.Nodes {
if n.SendNoLoad {
continue
}

for w := 0; w < testnet.LoadTxConnections; w++ {
cli, err := n.Client()
if err != nil {
return err
}
go loadProcess(ctx, txCh, chSuccess, cli)
}
}

// Monitor successful transactions, and abort on stalls.
Expand All @@ -67,58 +68,68 @@ func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
}

// loadGenerate generates jobs until the context is canceled
func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int) {
for i := 0; i < math.MaxInt64; i++ {
// We keep generating the same 1000 keys over and over, with different values.
// This gives a reasonable load without putting too much data in the app.
id := i % 1000

bz := make([]byte, 1024) // 1kb hex-encoded
_, err := rand.Read(bz)
if err != nil {
panic(fmt.Sprintf("Failed to read random bytes: %v", err))
}
tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz))

func loadGenerate(ctx context.Context, txCh chan<- types.Tx, testnet *e2e.Testnet, id []byte) {
t := time.NewTimer(0)
defer t.Stop()
for {
select {
case chTx <- tx:
time.Sleep(time.Second / time.Duration(multiplier))

case <-t.C:
case <-ctx.Done():
close(chTx)
close(txCh)
return
}
t.Reset(time.Second)

// A context with a timeout is created here to time the createTxBatch
// function out. If createTxBatch has not completed its work by the time
// the next batch is set to be sent out, then the context is cancled so that
// the current batch is halted, allowing the next batch to begin.
tctx, cf := context.WithTimeout(ctx, time.Second)
createTxBatch(tctx, txCh, testnet, id)
cf()
}
}

// loadProcess processes transactions
func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- types.Tx) {
// Each worker gets its own client to each node, which allows for some
// concurrency while still bounding it.
clients := map[string]*rpchttp.HTTP{}

var err error
for tx := range chTx {
node := testnet.RandomNode()
client, ok := clients[node.Name]
if !ok {
client, err = node.Client()
if err != nil {
continue
}

// check that the node is up
_, err = client.Health(ctx)
if err != nil {
continue
// createTxBatch creates new transactions and sends them into the txCh. createTxBatch
// returns when either a full batch has been sent to the txCh or the context
// is canceled.
func createTxBatch(ctx context.Context, txCh chan<- types.Tx, testnet *e2e.Testnet, id []byte) {
wg := &sync.WaitGroup{}
for i := 0; i < workerPoolSize; i++ {
wg.Add(1)
go func() {
for i := 0; i < testnet.LoadTxBatchSize; i++ {
tx, err := payload.NewBytes(&payload.Payload{
Id: id,
Size: uint64(testnet.LoadTxSizeBytes),
Rate: uint64(testnet.LoadTxBatchSize),
Connections: uint64(testnet.LoadTxConnections),
})
if err != nil {
panic(fmt.Sprintf("Failed to generate tx: %v", err))
}

select {
case txCh <- tx:
case <-ctx.Done():
return
}
}
wg.Done()
}()
}
wg.Wait()
}

clients[node.Name] = client
}

// loadProcess processes transactions by sending transactions received on the txCh
// to the client.
func loadProcess(ctx context.Context, txCh <-chan types.Tx, chSuccess chan<- struct{}, client *rpchttp.HTTP) {
var err error
s := struct{}{}
for tx := range txCh {
if _, err = client.BroadcastTxSync(ctx, tx); err != nil {
continue
}
chSuccess <- tx
chSuccess <- s
}
}
18 changes: 4 additions & 14 deletions test/e2e/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewCLI() *CLI {
ctx, loadCancel := context.WithCancel(context.Background())
defer loadCancel()
go func() {
err := Load(ctx, cli.testnet, 1)
err := Load(ctx, cli.testnet)
if err != nil {
logger.Error(fmt.Sprintf("Transaction load failed: %v", err.Error()))
}
Expand Down Expand Up @@ -216,20 +216,10 @@ func NewCLI() *CLI {
})

cli.root.AddCommand(&cobra.Command{
Use: "load [multiplier]",
Args: cobra.MaximumNArgs(1),
Use: "load",
Short: "Generates transaction load until the command is canceled",
RunE: func(cmd *cobra.Command, args []string) (err error) {
m := 1

if len(args) == 1 {
m, err = strconv.Atoi(args[0])
if err != nil {
return err
}
}

return Load(context.Background(), cli.testnet, m)
return Load(context.Background(), cli.testnet)
},
})

Expand Down Expand Up @@ -312,7 +302,7 @@ Does not run any perbutations.
ctx, loadCancel := context.WithCancel(context.Background())
defer loadCancel()
go func() {
err := Load(ctx, cli.testnet, 1)
err := Load(ctx, cli.testnet)
if err != nil {
logger.Error(fmt.Sprintf("Transaction load failed: %v", err.Error()))
}
Expand Down

0 comments on commit 21b2801

Please sign in to comment.