Skip to content

Commit

Permalink
Replace ExternalStream with new ByteStream type (nushell#12774)
Browse files Browse the repository at this point in the history
# Description
This PR introduces a `ByteStream` type which is a `Read`-able stream of
bytes. Internally, it has an enum over three different byte stream
sources:
```rust
pub enum ByteStreamSource {
    Read(Box<dyn Read + Send + 'static>),
    File(File),
    Child(ChildProcess),
}
```

This is in comparison to the current `RawStream` type, which is an
`Iterator<Item = Vec<u8>>` and has to allocate for each read chunk.

Currently, `PipelineData::ExternalStream` serves a weird dual role where
it is either external command output or a wrapper around `RawStream`.
`ByteStream` makes this distinction more clear (via `ByteStreamSource`)
and replaces `PipelineData::ExternalStream` in this PR:
```rust
pub enum PipelineData {
    Empty,
    Value(Value, Option<PipelineMetadata>),
    ListStream(ListStream, Option<PipelineMetadata>),
    ByteStream(ByteStream, Option<PipelineMetadata>),
}
```

The PR is relatively large, but a decent amount of it is just repetitive
changes.

This PR fixes nushell#7017, fixes nushell#10763, and fixes nushell#12369.

This PR also improves performance when piping external commands. Nushell
should, in most cases, have competitive pipeline throughput compared to,
e.g., bash.
| Command | Before (MB/s) | After (MB/s) | Bash (MB/s) |
| -------------------------------------------------- | -------------:|
------------:| -----------:|
| `throughput \| rg 'x'` | 3059 | 3744 | 3739 |
| `throughput \| nu --testbin relay o> /dev/null` | 3508 | 8087 | 8136 |

# User-Facing Changes
- This is a breaking change for the plugin communication protocol,
because the `ExternalStreamInfo` was replaced with `ByteStreamInfo`.
Plugins now only have to deal with a single input stream, as opposed to
the previous three streams: stdout, stderr, and exit code.
- The output of `describe` has been changed for external/byte streams.
- Temporary breaking change: `bytes starts-with` no longer works with
byte streams. This is to keep the PR smaller, and `bytes ends-with`
already does not work on byte streams.
- If a process core dumped, then instead of having a `Value::Error` in
the `exit_code` column of the output returned from `complete`, it now is
a `Value::Int` with the negation of the signal number.

# After Submitting
- Update docs and book as necessary
- Release notes (e.g., plugin protocol changes)
- Adapt/convert commands to work with byte streams (high priority is
`str length`, `bytes starts-with`, and maybe `bytes ends-with`).
- Refactor the `tee` code, Devyn has already done some work on this.

---------

Co-authored-by: Devyn Cairns <[email protected]>
  • Loading branch information
2 people authored and FilipAndersson245 committed May 18, 2024
1 parent af0b611 commit af1c34b
Show file tree
Hide file tree
Showing 210 changed files with 3,980 additions and 4,037 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions crates/nu-cli/src/completions/completer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ impl NuCompleter {
PipelineData::empty(),
);

match result {
Ok(pd) => {
let value = pd.into_value(span);
match result.and_then(|data| data.into_value(span)) {
Ok(value) => {
if let Value::List { vals, .. } = value {
let result =
map_value_completions(vals.iter(), Span::new(span.start, span.end), offset);
Expand Down
84 changes: 41 additions & 43 deletions crates/nu-cli/src/completions/custom_completions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,55 +74,53 @@ impl Completer for CustomCompletion {

// Parse result
let suggestions = result
.map(|pd| {
let value = pd.into_value(span);
match &value {
Value::Record { val, .. } => {
let completions = val
.get("completions")
.and_then(|val| {
val.as_list()
.ok()
.map(|it| map_value_completions(it.iter(), span, offset))
})
.unwrap_or_default();
let options = val.get("options");

if let Some(Value::Record { val: options, .. }) = &options {
let should_sort = options
.get("sort")
.and_then(|val| val.as_bool().ok())
.unwrap_or(false);
.and_then(|data| data.into_value(span))
.map(|value| match &value {
Value::Record { val, .. } => {
let completions = val
.get("completions")
.and_then(|val| {
val.as_list()
.ok()
.map(|it| map_value_completions(it.iter(), span, offset))
})
.unwrap_or_default();
let options = val.get("options");

if should_sort {
self.sort_by = SortBy::Ascending;
}
if let Some(Value::Record { val: options, .. }) = &options {
let should_sort = options
.get("sort")
.and_then(|val| val.as_bool().ok())
.unwrap_or(false);

custom_completion_options = Some(CompletionOptions {
case_sensitive: options
.get("case_sensitive")
.and_then(|val| val.as_bool().ok())
.unwrap_or(true),
positional: options
.get("positional")
.and_then(|val| val.as_bool().ok())
.unwrap_or(true),
match_algorithm: match options.get("completion_algorithm") {
Some(option) => option
.coerce_string()
.ok()
.and_then(|option| option.try_into().ok())
.unwrap_or(MatchAlgorithm::Prefix),
None => completion_options.match_algorithm,
},
});
if should_sort {
self.sort_by = SortBy::Ascending;
}

completions
custom_completion_options = Some(CompletionOptions {
case_sensitive: options
.get("case_sensitive")
.and_then(|val| val.as_bool().ok())
.unwrap_or(true),
positional: options
.get("positional")
.and_then(|val| val.as_bool().ok())
.unwrap_or(true),
match_algorithm: match options.get("completion_algorithm") {
Some(option) => option
.coerce_string()
.ok()
.and_then(|option| option.try_into().ok())
.unwrap_or(MatchAlgorithm::Prefix),
None => completion_options.match_algorithm,
},
});
}
Value::List { vals, .. } => map_value_completions(vals.iter(), span, offset),
_ => vec![],

completions
}
Value::List { vals, .. } => map_value_completions(vals.iter(), span, offset),
_ => vec![],
})
.unwrap_or_default();

Expand Down
5 changes: 3 additions & 2 deletions crates/nu-cli/src/config_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,15 @@ pub fn migrate_old_plugin_file(engine_state: &EngineState, storage_path: &str) -
let mut engine_state = engine_state.clone();
let mut stack = Stack::new();

if !eval_source(
if eval_source(
&mut engine_state,
&mut stack,
&old_contents,
&old_plugin_file_path.to_string_lossy(),
PipelineData::Empty,
false,
) {
) != 0
{
return false;
}

Expand Down
8 changes: 4 additions & 4 deletions crates/nu-cli/src/eval_cmds.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use log::info;
use miette::Result;
use nu_engine::{convert_env_values, eval_block};
use nu_parser::parse;
use nu_protocol::{
Expand Down Expand Up @@ -59,9 +58,10 @@ pub fn evaluate_commands(
t_mode.coerce_str()?.parse().unwrap_or_default();
}

let exit_code = pipeline.print(engine_state, stack, no_newline, false)?;
if exit_code != 0 {
std::process::exit(exit_code as i32);
if let Some(status) = pipeline.print(engine_state, stack, no_newline, false)? {
if status.code() != 0 {
std::process::exit(status.code())
}
}

info!("evaluate {}:{}:{}", file!(), line!(), column!());
Expand Down
23 changes: 13 additions & 10 deletions crates/nu-cli/src/eval_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub fn evaluate_file(
engine_state.merge_delta(working_set.delta)?;

// Check if the file contains a main command.
if engine_state.find_decl(b"main", &[]).is_some() {
let exit_code = if engine_state.find_decl(b"main", &[]).is_some() {
// Evaluate the file, but don't run main yet.
let pipeline =
match eval_block::<WithoutDebug>(engine_state, stack, &block, PipelineData::empty()) {
Expand All @@ -109,26 +109,29 @@ pub fn evaluate_file(
};

// Print the pipeline output of the last command of the file.
let exit_code = pipeline.print(engine_state, stack, true, false)?;
if exit_code != 0 {
std::process::exit(exit_code as i32);
if let Some(status) = pipeline.print(engine_state, stack, true, false)? {
if status.code() != 0 {
std::process::exit(status.code())
}
}

// Invoke the main command with arguments.
// Arguments with whitespaces are quoted, thus can be safely concatenated by whitespace.
let args = format!("main {}", args.join(" "));
if !eval_source(
eval_source(
engine_state,
stack,
args.as_bytes(),
"<commandline>",
input,
true,
) {
std::process::exit(1);
}
} else if !eval_source(engine_state, stack, &file, file_path_str, input, true) {
std::process::exit(1);
)
} else {
eval_source(engine_state, stack, &file, file_path_str, input, true)
};

if exit_code != 0 {
std::process::exit(exit_code)
}

info!("evaluate {}:{}:{}", file!(), line!(), column!());
Expand Down
3 changes: 1 addition & 2 deletions crates/nu-cli/src/menus/menu_completions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ impl Completer for NuMenuCompleter {

let res = eval_block::<WithoutDebug>(&self.engine_state, &mut self.stack, block, input);

if let Ok(values) = res {
let values = values.into_value(self.span);
if let Ok(values) = res.and_then(|data| data.into_value(self.span)) {
convert_to_suggestions(values, line, pos, self.only_buffer_difference)
} else {
Vec::new()
Expand Down
144 changes: 63 additions & 81 deletions crates/nu-cli/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use nu_parser::{escape_quote_string, lex, parse, unescape_unquote_string, Token,
use nu_protocol::{
debugger::WithoutDebug,
engine::{EngineState, Stack, StateWorkingSet},
print_if_stream, report_error, report_error_new, PipelineData, ShellError, Span, Value,
report_error, report_error_new, PipelineData, ShellError, Span, Value,
};
#[cfg(windows)]
use nu_utils::enable_vt_processing;
Expand Down Expand Up @@ -206,9 +206,48 @@ pub fn eval_source(
fname: &str,
input: PipelineData,
allow_return: bool,
) -> bool {
) -> i32 {
let start_time = std::time::Instant::now();

let exit_code = match evaluate_source(engine_state, stack, source, fname, input, allow_return) {
Ok(code) => code.unwrap_or(0),
Err(err) => {
report_error_new(engine_state, &err);
1
}
};

stack.add_env_var(
"LAST_EXIT_CODE".to_string(),
Value::int(exit_code.into(), Span::unknown()),
);

// reset vt processing, aka ansi because illbehaved externals can break it
#[cfg(windows)]
{
let _ = enable_vt_processing();
}

perf(
&format!("eval_source {}", &fname),
start_time,
file!(),
line!(),
column!(),
engine_state.get_config().use_ansi_coloring,
);

exit_code
}

fn evaluate_source(
engine_state: &mut EngineState,
stack: &mut Stack,
source: &[u8],
fname: &str,
input: PipelineData,
allow_return: bool,
) -> Result<Option<i32>, ShellError> {
let (block, delta) = {
let mut working_set = StateWorkingSet::new(engine_state);
let output = parse(
Expand All @@ -222,97 +261,40 @@ pub fn eval_source(
}

if let Some(err) = working_set.parse_errors.first() {
set_last_exit_code(stack, 1);
report_error(&working_set, err);
return false;
return Ok(Some(1));
}

(output, working_set.render())
};

if let Err(err) = engine_state.merge_delta(delta) {
set_last_exit_code(stack, 1);
report_error_new(engine_state, &err);
return false;
}
engine_state.merge_delta(delta)?;

let b = if allow_return {
let pipeline = if allow_return {
eval_block_with_early_return::<WithoutDebug>(engine_state, stack, &block, input)
} else {
eval_block::<WithoutDebug>(engine_state, stack, &block, input)
};
}?;

match b {
Ok(pipeline_data) => {
let config = engine_state.get_config();
let result;
if let PipelineData::ExternalStream {
stdout: stream,
stderr: stderr_stream,
exit_code,
..
} = pipeline_data
{
result = print_if_stream(stream, stderr_stream, false, exit_code);
} else if let Some(hook) = config.hooks.display_output.clone() {
match eval_hook(
engine_state,
stack,
Some(pipeline_data),
vec![],
&hook,
"display_output",
) {
Err(err) => {
result = Err(err);
}
Ok(val) => {
result = val.print(engine_state, stack, false, false);
}
}
} else {
result = pipeline_data.print(engine_state, stack, true, false);
}

match result {
Err(err) => {
report_error_new(engine_state, &err);
return false;
}
Ok(exit_code) => {
set_last_exit_code(stack, exit_code);
}
}

// reset vt processing, aka ansi because illbehaved externals can break it
#[cfg(windows)]
{
let _ = enable_vt_processing();
}
}
Err(err) => {
set_last_exit_code(stack, 1);
report_error_new(engine_state, &err);
return false;
}
}
perf(
&format!("eval_source {}", &fname),
start_time,
file!(),
line!(),
column!(),
engine_state.get_config().use_ansi_coloring,
);

true
}
let status = if let PipelineData::ByteStream(stream, ..) = pipeline {
stream.print(false)?
} else {
if let Some(hook) = engine_state.get_config().hooks.display_output.clone() {
let pipeline = eval_hook(
engine_state,
stack,
Some(pipeline),
vec![],
&hook,
"display_output",
)?;
pipeline.print(engine_state, stack, false, false)
} else {
pipeline.print(engine_state, stack, true, false)
}?
};

fn set_last_exit_code(stack: &mut Stack, exit_code: i64) {
stack.add_env_var(
"LAST_EXIT_CODE".to_string(),
Value::int(exit_code, Span::unknown()),
);
Ok(status.map(|status| status.code()))
}

#[cfg(test)]
Expand Down

0 comments on commit af1c34b

Please sign in to comment.