New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace ExternalStream
with new ByteStream
type
#12774
Conversation
…data` to `pipeline`
ExternalStream
with new Bytestream
typeExternalStream
with new ByteStream
type
That is an impressive chunk of work and a nice improvement! Before we run ahead just looking at the streaming side we need to talk about the error changes, to ensure we don't break our execution semantics in unintended ways.
What is up here? Is there a breakage we need to resolve before merging? |
I have the same concern about turning the exit codes into |
Ok, I think this should be mostly ready for review. Also, I stole some of your tests @devyn :P |
I'll have a look again, then. Thanks for all of your hard work! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the improvement! Just leaves some thought about stderr consumer thread.
(Some(out), Some(err)) => { | ||
// To avoid deadlocks, we must spawn a separate thread to wait on stderr. | ||
thread::scope(|s| { | ||
let err_thread = thread::Builder::new() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's good to name the thread, here is a reason: #7879
let stderr = self | ||
.stderr | ||
.take() | ||
.map(|stderr| thread::Builder::new().spawn(move || consume_pipe(stderr))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can name the thread here
Is #12369 still part of this after removing the main error breaking changes? |
Yes, this PR still fixes #12369. If there is no exit code (i.e., the process was terminated via signal), then This is sort of a stop-gap measure until we figure out how we want to report this information. |
// #[test] | ||
// fn short_stream_binary() { | ||
// let actual = nu!(r#" | ||
// nu --testbin repeater (0x[01]) 5 | bytes starts-with 0x[010101] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pin: commented out tests
@devyn so we agreed in the core team meeting today that once you give the green light and have reviewed the PR we will go ahead and land it... Thank you kindly for doing this ! 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I found one potential issue with bytes starts-with
. A lot of the files changed are just into_value()
related, with that being a Result
now, so this isn't quite as big as it looks.
Didn't have any issues running it yet, and exit code is definitely working now. I'm glad you changed the each
behaviour to Chunks
and got rid of implicit line buffering.
Great stuff!
for item in stream { | ||
let byte_slice = match &item { | ||
// String and binary data are valid byte patterns | ||
Ok(Value::String { val, .. }) => val.as_bytes(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this will be a breaking change, as this worked before:
^echo foo | bytes starts-with ("foo" | into binary)
In general, I think untyped byte streams should be able to be treated as binary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was me being lazy making the PR smaller 😄. After this, I intend to open another PR to restore that command and also make bytes ends-with
support byte streams properly.
// https://github.com/nushell/nushell/pull/9377 contains the reason | ||
// for not using BufWriter<File> | ||
// | ||
// TODO: flag to specify buffered (BufWriter), line buffered (LineWriter), or no buffering (File). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this will ever be necessary, because the input stream is generally buffered in some way, even if by the command that produced it. If someone knows they want to use bigger buffers, it might be better to provide a command to adapt the stream with read_exact()
or read_until()
calls to ensure the desired chunking behaviour. And of course it should already be possible to do | lines | save
if you definitely want line buffering.
I like this behaviour though, it means that the output of an external command gets written to file with the flushing behaviour it was intended to have, which is particularly good if saving to something like a FIFO where the interactivity actually matters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, that's a good observation! There might be an argument to add the flags for performance reasons (e.g., to avoid the allocations), but it's probably not worth it / it won't make a noticeable difference.
Thanks for taking the time to review this @devyn! |
@IanManske I am planning on landing this PR in the morning --- its 10pm in Oregon now... Unless someone beats me to it... I checked out your branch and ran all of the tests and it looks good to me... I will await you giving me the green light to let me know if its now ready to go. Based on Devyn's feedback it looks good to me... Thanks for all of your hard work on this PR... |
Sounds good, feel free to land it whenever @stormasm. Thanks! |
@IanManske cool ! thanks for giving the green light 👍 |
Wow, it’s great to see thoughtful improvements to internals like this. Big thanks to Ian and everyone who reviewed this. |
# Description Forgot that I fixed this already on my branch, but when printing without a display output hook, the implicit call to `table` gets its output mangled with newlines (since #12774). This happens when running `nu -c` or a script file. Here's that fix in one PR so it can be merged easily. # Tests + Formatting - 🟢 `toolkit fmt` - 🟢 `toolkit clippy` - 🟢 `toolkit test` - 🟢 `toolkit test stdlib`
# Description This PR introduces a `ByteStream` type which is a `Read`-able stream of bytes. Internally, it has an enum over three different byte stream sources: ```rust pub enum ByteStreamSource { Read(Box<dyn Read + Send + 'static>), File(File), Child(ChildProcess), } ``` This is in comparison to the current `RawStream` type, which is an `Iterator<Item = Vec<u8>>` and has to allocate for each read chunk. Currently, `PipelineData::ExternalStream` serves a weird dual role where it is either external command output or a wrapper around `RawStream`. `ByteStream` makes this distinction more clear (via `ByteStreamSource`) and replaces `PipelineData::ExternalStream` in this PR: ```rust pub enum PipelineData { Empty, Value(Value, Option<PipelineMetadata>), ListStream(ListStream, Option<PipelineMetadata>), ByteStream(ByteStream, Option<PipelineMetadata>), } ``` The PR is relatively large, but a decent amount of it is just repetitive changes. This PR fixes nushell#7017, fixes nushell#10763, and fixes nushell#12369. This PR also improves performance when piping external commands. Nushell should, in most cases, have competitive pipeline throughput compared to, e.g., bash. | Command | Before (MB/s) | After (MB/s) | Bash (MB/s) | | -------------------------------------------------- | -------------:| ------------:| -----------:| | `throughput \| rg 'x'` | 3059 | 3744 | 3739 | | `throughput \| nu --testbin relay o> /dev/null` | 3508 | 8087 | 8136 | # User-Facing Changes - This is a breaking change for the plugin communication protocol, because the `ExternalStreamInfo` was replaced with `ByteStreamInfo`. Plugins now only have to deal with a single input stream, as opposed to the previous three streams: stdout, stderr, and exit code. - The output of `describe` has been changed for external/byte streams. - Temporary breaking change: `bytes starts-with` no longer works with byte streams. This is to keep the PR smaller, and `bytes ends-with` already does not work on byte streams. - If a process core dumped, then instead of having a `Value::Error` in the `exit_code` column of the output returned from `complete`, it now is a `Value::Int` with the negation of the signal number. # After Submitting - Update docs and book as necessary - Release notes (e.g., plugin protocol changes) - Adapt/convert commands to work with byte streams (high priority is `str length`, `bytes starts-with`, and maybe `bytes ends-with`). - Refactor the `tee` code, Devyn has already done some work on this. --------- Co-authored-by: Devyn Cairns <[email protected]>
) # Description Forgot that I fixed this already on my branch, but when printing without a display output hook, the implicit call to `table` gets its output mangled with newlines (since nushell#12774). This happens when running `nu -c` or a script file. Here's that fix in one PR so it can be merged easily. # Tests + Formatting - 🟢 `toolkit fmt` - 🟢 `toolkit clippy` - 🟢 `toolkit test` - 🟢 `toolkit test stdlib`
I'm trying to write tests for I'm currently doing this, but it feels awkward: if let PipelineData::ByteStream(byte_stream, ..) = pipeline_data {
if let ByteStreamSource::Child(child) = byte_stream.into_source() {
if let Some(stderr) = child.stderr.as_mut() {
stderr.read(&mut buf);
}
}
} |
impl ByteStream {
pub fn write_to(self, dest: &mut impl Write) -> Result<Option<ExitStatus>, ShellError> {
// ...
}
} Generic reader/writer functions should take |
This is intentionally verbose. In fact, I should have maybe made it an unsafe API. Reading from
Oh, good to know. I guess |
Rust's safety guarantees doesn't cover deadlocks, so it's probably fine.
Unfortunate indeed. The Rust team tried to change it, but couldn't due to API breakage. |
Description
This PR introduces a
ByteStream
type which is aRead
-able stream of bytes. Internally, it has an enum over three different byte stream sources:This is in comparison to the current
RawStream
type, which is anIterator<Item = Vec<u8>>
and has to allocate for each read chunk.Currently,
PipelineData::ExternalStream
serves a weird dual role where it is either external command output or a wrapper aroundRawStream
.ByteStream
makes this distinction more clear (viaByteStreamSource
) and replacesPipelineData::ExternalStream
in this PR:The PR is relatively large, but a decent amount of it is just repetitive changes.
This PR fixes #7017, fixes #10763, and fixes #12369.
This PR also improves performance when piping external commands. Nushell should, in most cases, have competitive pipeline throughput compared to, e.g., bash.
throughput | rg 'x'
throughput | nu --testbin relay o> /dev/null
User-Facing Changes
ExternalStreamInfo
was replaced withByteStreamInfo
. Plugins now only have to deal with a single input stream, as opposed to the previous three streams: stdout, stderr, and exit code.describe
has been changed for external/byte streams.bytes starts-with
no longer works with byte streams. This is to keep the PR smaller, andbytes ends-with
already does not work on byte streams.Value::Error
in theexit_code
column of the output returned fromcomplete
, it now is aValue::Int
with the negation of the signal number.After Submitting
str length
,bytes starts-with
, and maybebytes ends-with
).tee
code, Devyn has already done some work on this.