Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ChanReader / ChanWriter for use inside toxics #134

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from

Conversation

xthexder
Copy link
Contributor

@xthexder xthexder commented Oct 4, 2016

While working on the bidirectional toxics PRs I discovered some problems with how ChanReader worked when used ephemerally inside a toxic.
These turned out to be non-trivial to fix, with the end result being this PR.

The problem with ChanReader is that in some cases it may buffer data. This means if you ever stop using the reader and switch to another, there's a possibility of losing data.
Further problems arise when ChanReader is read using other readers such as bufio.Reader as required by functions such as http.ReadRequest().

These problems are solved in 3 ways:

  1. Use a non-ephemeral reader object on the ToxicStub instead of creating it every time Pipe() is called.
  2. Create a TransactionalReader that can be rolled back to specific points so that bufio.Reader can effectively be un-read.
  3. Allow all buffers inside the reader to be explicitly flushed at any point so that no data is lost.

Example TransactionalReader usage:

// Input is "hello world"
txnReader := stream.NewTransactionalReader(input)

{ // This section is ephemeral (and would be similar to usage in `Pipe()`)
    bufReader := bufio.NewReader(txnReader)
    msg, _ := bufReader.ReadString(' ')
    // msg == "hello "
    // bufReader has "world" buffered (bufReader.Buffered() == 5)

    // Mark everything up to "hello " as read.
    txnReader.Checkpoint(-bufReader.Buffered())
    // Rollback txnReader to the checkpoint, we never read "world"
    txnReader.Rollback()
    // bufReader can now be thrown away
}

// Any further reads from txnReader will start from the checkpoint
buf := make([]byte, 5)
txnReader.Read(buf)
// buf == "world"

Actual usage can be found in my WIP redis toxic here: https://github.com/Shopify/toxiproxy/blob/redis-wip/toxics/redis.go

The documentation for this is pretty tricky to write without actual code examples, so I'll likely leave the detailed docs until we have at least 1 protocol-aware toxic to point at.

@sirupsen @jpittis / anyone else interested in Golang

@xthexder xthexder added this to the 2.2.0 milestone Dec 7, 2016
@xthexder xthexder changed the base branch from master to dev December 14, 2016 19:57
@xthexder
Copy link
Contributor Author

Went through this again and cleaned some things up now that it's not fresh in my mind.
Should be good for a review @sirupsen

return
} else if err == io.EOF {
stub.Close()
return
}
writer.Write(buf[:n])
stub.Writer.Write(buf[:n])
stub.Reader.Checkpoint(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Checkpoint(0)? Should it not be Checkpoint(-stub.Reader.Buffered())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example where I used .Buffered() was for a bufio.Reader attached to the TransactionalReader.
In this case, the amount of buffered data is always 0. Since everything read is being used, we can set the checkpoint to the last read byte, rather than the (last read) - (amount buffered by bufio.Reader)


func NewTransactionalReader(input <-chan *StreamChunk) *TransactionalReader {
t := &TransactionalReader{
buffer: bytes.NewBuffer(make([]byte, 0, 32*1024)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you choose 32K sized buffer for a specific reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose 32K because it's the size used by io.Copy(), however bytes.Buffer will grow the size of the buffer as necessary.
This is just a nice default size so in most cases it doesn't need to be reallocated.

current = int(t.bufReader.Size()) - t.bufReader.Len()
}

n := current
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initializing n to current seems to be pointless here? Why not var n int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, looks like this is redundant. I'll see if I can refactor this function and make it a little simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually found another bug while looking at this. if bufReader != nil, t.buffer.Reset() shouldn't be called unless the offset is big enough.
I fixed this, and added a test for it.

Copy link
Contributor

@jpittis jpittis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to get my head around this buffered stuff so we can get this out the door too.Do say if you want to ship this yourself @xthexder. These PR comments are for my own understanding so that I can keep reviewing later.

}
stub.Reader.SetInterrupt(stub.Interrupt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this Interupt monkey business.

current := t.buffer.Len()
if t.bufReader != nil {
current = int(t.bufReader.Size()) - t.bufReader.Len()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So current represents how much of the buffer we've already read.

  • 0 will say... okay we've read everything! Lock it in!
  • -5 will say we've read everything except for the last 5 bytes. Let's start reading again from the start of those last 5.
  • 5 will say we successfully read 5 bytes but not the rest. Let's start reading from the rest.
  • t.buffer.Len() will do the same thing is 0.

Okay I found this confusing but LGTM

}
if len(buf[n:]) > 0 {
t.reader.Read(buf[n:])
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? Why are we trying to flush the reader? It's not buffered is it? I'll look at this again later and probably understand right away.

@miry miry self-assigned this Sep 17, 2021
@miry miry modified the milestones: 2.2.0, 2.3.0 Oct 22, 2021
@miry miry modified the milestones: 2.3.0, 2.4.0 Dec 23, 2021
@miry miry mentioned this pull request Jan 5, 2022
8 tasks
@miry miry mentioned this pull request Mar 3, 2022
18 tasks
@miry miry modified the milestones: 2.4.0, 2.5.0 Mar 3, 2022
@miry miry mentioned this pull request Sep 4, 2022
13 tasks
@miry miry modified the milestones: 2.5.0, 3.0.0 Sep 11, 2022
@miry miry removed their assignment Apr 24, 2023
@miry miry removed this from the 3.0.0 milestone Apr 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants