Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: adds comments to describe the mongo parser #1818

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/core/proxy/integrations/mongo/command.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Package mongo provides functionality for working with MongoDB outgoing calls.
package mongo

// This file contains code from the coinbase mongobetween
// https://github.com/coinbase/mongobetween/blob/1034c5a0c3f10cb1dd84af2981bc55ea1d3b45c0/mongo/command.go#L10
import (
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
)
Expand Down
25 changes: 24 additions & 1 deletion pkg/core/proxy/integrations/mongo/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"go.uber.org/zap"
)

// decodeMongo decodes the mongo wire message from the client connection
// and sends the response back to the client.
func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn net.Conn, dstCfg *integrations.ConditionalDstCfg, mockDb integrations.MockMemDb, opts models.OutgoingOptions) error {
startedDecoding := time.Now()
requestBuffers := [][]byte{reqBuf}
Expand All @@ -29,17 +31,20 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
defer close(errCh)
var readRequestDelay time.Duration
for {
// get the config mocks from the mockDb for responding to heartbeat calls
configMocks, err := mockDb.GetUnFilteredMocks()
if err != nil {
utils.LogError(logger, err, "error while getting config mock")
}
logger.Debug(fmt.Sprintf("the config mocks are: %v", configMocks))

var (
mongoRequests []models.MongoRequest
mongoRequests []models.MongoRequest // stores the request packet
)
// check to read the request buffer from the client connection after the initial packeyt
if string(reqBuf) == "read form client conn" {
started := time.Now()
// reads the first chunk of the mongo request
reqBuf, err = util.ReadBytes(ctx, logger, clientConn)
if err != nil {
if err == io.EOF {
Expand All @@ -60,6 +65,7 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
return
}
logger.Debug(fmt.Sprintf("the loop starts with the time delay: %v", time.Since(startedDecoding)))
// convert the request buffer to the mongo wire message in the go struct
opReq, requestHeader, mongoRequest, err := Decode(reqBuf, logger)
if err != nil {
utils.LogError(logger, err, "failed to decode the mongo wire message from the client")
Expand All @@ -71,10 +77,13 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
Message: mongoRequest,
ReadDelay: int64(readRequestDelay),
})
// check for the more_to_come flag bit in the mongo request
// header to read the next chunks of the request
if val, ok := mongoRequest.(*models.MongoOpMessage); ok && hasSecondSetBit(val.FlagBits) {
for {
started := time.Now()
logger.Debug("into the for loop for request stream")
// reads the next chunk of the mongo request
requestBuffer1, err := util.ReadBytes(ctx, logger, clientConn)
if err != nil {
if err == io.EOF {
Expand All @@ -93,6 +102,7 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
logger.Debug("the response from the server is complete")
break
}
// convert the request buffer to the mongo response wire message in the go struct
_, reqHeader, mongoReq, err := Decode(requestBuffer1, logger)
if err != nil {
utils.LogError(logger, err, "failed to decode the mongo wire message from the mongo client")
Expand All @@ -110,19 +120,24 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
})
}
}
// check for the heartbeat request from client and use the config mocks to respond
if isHeartBeat(logger, opReq, *mongoRequests[0].Header, mongoRequests[0].Message) {
logger.Debug("recieved a heartbeat request for mongo", zap.Any("config mocks", len(configMocks)))
maxMatchScore := 0.0
bestMatchIndex := -1
// loop over the recorded config mocks to match with the incoming request
for configIndex, configMock := range configMocks {
logger.Debug("the config mock is: ", zap.Any("config mock", configMock), zap.Any("actual request", mongoRequests))
// checking the number of chunks for recorded config mocks and the incoming request
if len(configMock.Spec.MongoRequests) == len(mongoRequests) {
for i, req := range configMock.Spec.MongoRequests {
// check the opcode of the incoming request and the recorded config mocks
if len(configMock.Spec.MongoRequests) != len(mongoRequests) || req.Header.Opcode != mongoRequests[i].Header.Opcode {
continue
}
switch req.Header.Opcode {
case wiremessage.OpQuery:
// check the query fields of the incoming request and the recorded config mocks
expectedQuery := req.Message.(*models.MongoOpQuery)
actualQuery := mongoRequests[i].Message.(*models.MongoOpQuery)
if actualQuery.FullCollectionName != expectedQuery.FullCollectionName ||
Expand All @@ -133,6 +148,7 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
continue
}

// calculate the matching score for query bson dcouments of the incoming request and the recorded config mocks
expected := map[string]interface{}{}
actual := map[string]interface{}{}
err = bson.UnmarshalExtJSON([]byte(expectedQuery.Query), true, &expected)
Expand All @@ -153,13 +169,15 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
}

case wiremessage.OpMsg:
// check the OpMsg sections of the incoming request and the recorded config mocks
if req.Message.(*models.MongoOpMessage).FlagBits != mongoRequests[i].Message.(*models.MongoOpMessage).FlagBits {
continue
}
scoreSum := 0.0
if len(req.Message.(*models.MongoOpMessage).Sections) != len(mongoRequests[i].Message.(*models.MongoOpMessage).Sections) {
continue
}
// calculate the matching score for each section of the incoming request and the recorded config mocks
for sectionIndx, section := range req.Message.(*models.MongoOpMessage).Sections {
if len(req.Message.(*models.MongoOpMessage).Sections) == len(mongoRequests[i].Message.(*models.MongoOpMessage).Sections) {
score := compareOpMsgSection(logger, section, mongoRequests[i].Message.(*models.MongoOpMessage).Sections[sectionIndx])
Expand Down Expand Up @@ -190,6 +208,7 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
errCh <- err
return
}
// write the mongo response to the client connection from the recorded config mocks that most matches the incoming request
for _, mongoResponse := range configMocks[bestMatchIndex].Spec.MongoResponses {
switch mongoResponse.Header.Opcode {
case wiremessage.OpReply:
Expand Down Expand Up @@ -238,6 +257,9 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
}
}
} else {
// handle for the non-heartbeat request from the client

// match the incoming request with the recorded tcsMocks and return a mocked response which matches most with incoming request
matched, matchedMock, err := match(ctx, logger, mongoRequests, mockDb)
if err != nil {
errCh <- err
Expand All @@ -258,6 +280,7 @@ func decodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientC
responseTo := mongoRequests[0].Header.RequestID
logger.Debug("the mock matched with the current request", zap.Any("mock", matchedMock), zap.Any("responseTo", responseTo))

// write the mongo response to the client connection from the recorded tcsMocks that most matches the incoming request
for _, resp := range matchedMock.Spec.MongoResponses {
respMessage := resp.Message.(*models.MongoOpMessage)
var expectedRequestSections []string
Expand Down
48 changes: 28 additions & 20 deletions pkg/core/proxy/integrations/mongo/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"go.uber.org/zap"
)

// encodeMongo records the outgoing mongo messages of the client connection,
// decodes the wiremessage binary and writes readable string
// to the yaml file.
func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, _ models.OutgoingOptions) error {

errCh := make(chan error, 1)
Expand All @@ -32,11 +35,10 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
for {
var err error
var readRequestDelay time.Duration
// var logStr string = fmt.Sprintln("the conn id: ", clientConnId, " the destination conn id: ", destConnId)

// logStr += fmt.Sprintln("started reading from the client: ", started)
// reads the request packets from the client connection after the first request packet.
// Since, that is already read in the RecordOutgoing function.
if string(reqBuf) == "read form client conn" {
// lstr := ""
started := time.Now()
reqBuf, err = pUtil.ReadBytes(ctx, logger, clientConn)
logger.Debug("reading from the mongo conn", zap.Any("", string(reqBuf)))
Expand All @@ -56,9 +58,10 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}

var (
mongoRequests []models.MongoRequest
mongoResponses []models.MongoResponse
mongoRequests []models.MongoRequest // stores the decoded binary packets for a request
mongoResponses []models.MongoResponse // stores the decoded binary packets for a response
)
// decode the binary packet and store the values in the corresponding struct
opReq, requestHeader, mongoRequest, err := Decode(reqBuf, logger)
if err != nil {
utils.LogError(logger, err, "failed to decode the mongo wire message from the client")
Expand All @@ -71,6 +74,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
Message: mongoRequest,
ReadDelay: int64(readRequestDelay),
})
// forwards the request packet to the destination server
_, err = destConn.Write(reqBuf)
if err != nil {
if ctx.Err() != nil {
Expand All @@ -82,12 +86,11 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}
logger.Debug(fmt.Sprintf("the request in the mongo parser after passing to dest: %v", len(reqBuf)))

// logStr += fmt.Sprintln("after writing the request to the destination: ", time.Since(started))
// check for the request packet streaming for the mongo wire message
if val, ok := mongoRequest.(*models.MongoOpMessage); ok && hasSecondSetBit(val.FlagBits) {
for {
// read the streaming request packets
requestBuffer1, err := pUtil.ReadBytes(ctx, logger, clientConn)

// logStr += tmpStr
if err != nil {
if err == io.EOF {
logger.Debug("recieved request buffer is empty in record mode for mongo request")
Expand All @@ -99,7 +102,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
return nil
}

// write the reply to mongo client
// forward the request packet to the destination server
_, err = destConn.Write(requestBuffer1)
if err != nil {
if ctx.Err() != nil {
Expand All @@ -110,12 +113,12 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
return nil
}

// logStr += fmt.Sprintln("after writting response to the client: ", time.Since(started), "current time is: ", time.Now())

if len(requestBuffer1) == 0 {
logger.Debug("the response from the server is complete")
break
}
// decode the binary packet and return the values in the corresponding structs
// for header and message.
_, reqHeader, mongoReq, err := Decode(requestBuffer1, logger)
if err != nil {
utils.LogError(logger, err, "failed to decode the mongo wire message from the destination server")
Expand All @@ -134,10 +137,10 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}
}

// read reply message from the mongo server
// tmpStr := ""
reqTimestampMock := time.Now()
started := time.Now()

// read reply message length from the destination mongo server
responsePckLengthBuffer, err := pUtil.ReadRequiredBytes(ctx, logger, destConn, 4)
if err != nil {
if err == io.EOF {
Expand All @@ -152,16 +155,17 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by

logger.Debug("recieved these pck length packets", zap.Any("packets", responsePckLengthBuffer))

// convert packet length to LittleEndian integer.
pckLength := getPacketLength(responsePckLengthBuffer)
logger.Debug("received pck length ", zap.Any("packet length", pckLength))

// read the entire response packet
responsePckDataBuffer, err := pUtil.ReadRequiredBytes(ctx, logger, destConn, int(pckLength)-4)

logger.Debug("recieved these packets", zap.Any("packets", responsePckDataBuffer))

responseBuffer := append(responsePckLengthBuffer, responsePckDataBuffer...)
logger.Debug("reading from the destination mongo server", zap.Any("", string(responseBuffer)))
// logStr += tmpStr
if err != nil {
if err == io.EOF {
logger.Debug("recieved response buffer is empty in record mode for mongo call")
Expand All @@ -174,7 +178,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}
readResponseDelay := time.Since(started)

// write the reply to mongo client
// write the response packet to mongo client
_, err = clientConn.Write(responseBuffer)
if err != nil {
if ctx.Err() != nil {
Expand All @@ -185,8 +189,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
return nil
}

// logStr += fmt.Sprintln("after writting response to the client: ", time.Since(started), "current time is: ", time.Now())

// decode the binary packet of response and return the values in the corresponding structs
_, responseHeader, mongoResponse, err := Decode(responseBuffer, logger)
if err != nil {
utils.LogError(logger, err, "failed to decode the mongo wire message from the destination server")
Expand All @@ -198,14 +201,17 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
Message: mongoResponse,
ReadDelay: int64(readResponseDelay),
})
// check for the response packet streaming for the mongo wire message
if val, ok := mongoResponse.(*models.MongoOpMessage); ok && hasSecondSetBit(val.FlagBits) {
for i := 0; ; i++ {
// handle the streaming response packets for heartbeat calls
if i == 0 && isHeartBeat(logger, opReq, *mongoRequests[0].Header, mongoRequests[0].Message) {
m.recordMessage(ctx, logger, mongoRequests, mongoResponses, opReq, reqTimestampMock, mocks)
}
started = time.Now()

// read the response packets from the destination server
responseBuffer, err = pUtil.ReadBytes(ctx, logger, destConn)
// logStr += tmpStr
if err != nil {
if err == io.EOF {
logger.Debug("recieved response buffer is empty in record mode for mongo call")
Expand All @@ -232,12 +238,11 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}
logger.Debug(fmt.Sprintf("the response in the mongo parser after passing to client: %v", len(responseBuffer)))

// logStr += fmt.Sprintln("after writting response to the client: ", time.Since(started), "current time is: ", time.Now())

if len(responseBuffer) == 0 {
logger.Debug("the response from the server is complete")
break
}
// decode the binary packet for response and return the values in the corresponding structs
_, respHeader, mongoResp, err := Decode(responseBuffer, logger)
if err != nil {
utils.LogError(logger, err, "failed to decode the mongo wire message from the destination server")
Expand All @@ -256,7 +261,9 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}
}

// write the response packet to the yaml file
m.recordMessage(ctx, logger, mongoRequests, mongoResponses, opReq, reqTimestampMock, mocks)
// assigns "read form client conn" to the reqBuf to read the next request packet from the client connection
reqBuf = []byte("read form client conn")
}
})
Expand All @@ -272,6 +279,7 @@ func (m *Mongo) encodeMongo(ctx context.Context, logger *zap.Logger, reqBuf []by
}
}

// getPacketLength returns the length of the packet from the first 4 bytes of the packet.
func getPacketLength(src []byte) (length int32) {
length = int32(src[0]) | int32(src[1])<<8 | int32(src[2])<<16 | int32(src[3])<<24
return length
Expand Down
4 changes: 4 additions & 0 deletions pkg/core/proxy/integrations/mongo/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.uber.org/zap"
)

// match mathces and returns the best matching mock for the incoming mongo requests.
func match(ctx context.Context, logger *zap.Logger, mongoRequests []models.MongoRequest, mockDb integrations.MockMemDb) (bool, *models.Mock, error) {
for {
select {
Expand All @@ -27,15 +28,18 @@ func match(ctx context.Context, logger *zap.Logger, mongoRequests []models.Mongo
}
maxMatchScore := 0.0
bestMatchIndex := -1
// iterate over the tcsMocks and compare the incoming mongo requests with the recorded mongo requests.
for tcsIndx, tcsMock := range tcsMocks {
if ctx.Err() != nil {
return false, nil, ctx.Err()
}
// check for the number of chunks in the incoming mongo requests and the recorded mongo requests.
if len(tcsMock.Spec.MongoRequests) == len(mongoRequests) {
for i, req := range tcsMock.Spec.MongoRequests {
if ctx.Err() != nil {
return false, nil, ctx.Err()
}
// check for the opcode of the incoming mongo requests and the recorded mongo requests.
if len(tcsMock.Spec.MongoRequests) != len(mongoRequests) || req.Header.Opcode != mongoRequests[i].Header.Opcode {
logger.Debug("the recieved request is not of same type with the tcmocks", zap.Any("at index", tcsIndx))
continue
Expand Down
Loading
Loading