Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash in Windows PipeReader #10674

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/bun-internal-test/src/runner.node.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ if (failing_tests.length) {
.replace(/^::(group|endgroup|error|warning|set-output|add-matcher|remove-matcher).*$/gm, "");

if (failing_output.length > 1024 * 64) {
failing_output = failing_output.slice(0, 1024 * 64) + `\n\n[truncated output (length: ${failing_output.length})]`;
failing_output =
failing_output.slice(0, 1024 * 64) + `\n\n[truncated output (length: ${failing_output.length})]\n`;
}

report += failing_output;
Expand Down
11 changes: 11 additions & 0 deletions src/Global.zig
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ pub fn exit(code: u8) noreturn {
}

pub fn exitWide(code: u32) noreturn {
if (bun.crash_handler.panicking.load(.SeqCst) != 0) {
// Another thread is panicking. If we exit now, their message will be
// truncated. Instead, we let them handle the exit. Note that the panic
// handler NEVER calls this exitWide

// Sleep forever without hammering the CPU
var futex = std.atomic.Value(u32).init(0);
while (true) std.Thread.Futex.wait(&futex, 0);
comptime unreachable;
}

if (comptime Environment.isMac) {
std.c.exit(@bitCast(code));
}
Expand Down
20 changes: 20 additions & 0 deletions src/bun.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3155,3 +3155,23 @@ pub inline fn unsafeAssert(condition: bool) void {
unreachable;
}
}

pub fn unexpectedTag(union_value: anytype) noreturn {
@setCold(true);

const non_pointer = switch (@typeInfo(@TypeOf(union_value))) {
else => union_value,
.Pointer => union_value.*,
};

Output.panic("Unexpected tag: " ++ @typeName(@TypeOf(non_pointer)) ++ ".{s}", .{
@tagName(non_pointer),
});
}

pub fn releaseNonNull(optional: anytype) @TypeOf(optional.?) {
return optional orelse {
const name = @typeName(@TypeOf(optional.?));
Output.panic("Unexpected null (" ++ name ++ ")", .{});
};
}
6 changes: 5 additions & 1 deletion src/crash_handler.zig
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var has_printed_message = false;

/// Non-zero whenever the program triggered a panic.
/// The counter is incremented/decremented atomically.
var panicking = std.atomic.Value(u8).init(0);
pub var panicking = std.atomic.Value(u8).init(0);

// Locked to avoid interleaving panic messages from multiple threads.
var panic_mutex = std.Thread.Mutex{};
Expand Down Expand Up @@ -151,6 +151,10 @@ pub fn crashHandler(
writer.writeAll(Output.prettyFmt("<red>", true)) catch std.os.abort();
}

if (bun.Output.shouldLogPid()) |pid| {
writer.print("process {d} ", .{pid}) catch std.os.abort();
}

writer.writeAll("panic") catch std.os.abort();

if (Output.enable_ansi_colors) {
Expand Down
7 changes: 6 additions & 1 deletion src/deps/libuv.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1777,6 +1777,11 @@ pub const fs_t = extern struct {
pub inline fn deinit(this: *fs_t) void {
this.assertInitialized();
uv_fs_req_cleanup(this);
if (bun.Environment.isDebug) {
const flags = this.flags;
this.* = undefined;
this.flags = flags;
}
this.assertCleanedUp();
}

Expand All @@ -1798,7 +1803,7 @@ pub const fs_t = extern struct {
if ((this.flags & UV_FS_CLEANEDUP) != 0) {
return;
}
@panic("uv_fs_t was not cleaned up. it is expected to call .deinit() on the fs_t here.");
@panic("uv_fs_t was not cleaned up. this is either a double-use of fs_t or a missing .deinit() call");
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/fd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,14 @@ pub const FDImpl = packed struct {
@compileError("invalid format string for FDImpl.format. must be empty like '{}'");
}

// if (std.debug.runtime_safety) {
// const undef = comptime std.mem.bytesAsValue(FDImpl, &([_]u8{0xAA} ** @sizeOf(FDImpl))).encode();
// if (this.encode() == undef) {
// try writer.writeAll("[garbage memory]");
// return;
// }
// }

switch (env.os) {
else => {
const fd = this.system();
Expand Down
63 changes: 29 additions & 34 deletions src/io/PipeReader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ pub fn WindowsPipeReader(
return;
}
var this: *This = bun.cast(*This, fs.data);
const fd = bun.toFD(fs.file.fd);
fs.deinit();

switch (nread_int) {
Expand All @@ -393,36 +394,28 @@ pub fn WindowsPipeReader(
this.onRead(.{ .err = err }, "", .progress);
return;
}
defer {
// if we are not paused we keep reading until EOF or err
if (!this.flags.is_paused) {
if (this.source) |source| {
if (source == .file) {
const file = source.file;
source.setData(this);
const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024);
file.iov = uv.uv_buf_t.init(buf);
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| {
this.flags.is_paused = true;
// we should inform the error if we are unable to keep reading
this.onRead(.{ .err = err }, "", .progress);
}
// we got some data lets get the current iov
const len: usize = @intCast(nread_int);
switch (bun.releaseNonNull(this.source)) {
.file => |file| {
defer if (!this.flags.is_paused) {
file.fs.assertCleanedUp();
const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024);
file.iov = uv.uv_buf_t.init(buf);
bun.Output.debug("continue reading(0x{X}, {}, {d}) = {}", .{ @intFromPtr(this), fd, buf.len, this.flags.is_paused });
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| {
this.flags.is_paused = true;
// we should inform the error if we are unable to keep reading
this.onRead(.{ .err = err }, "", .progress);
}
}
}
}
file.fs.data = this;
};

const len: usize = @intCast(nread_int);
// we got some data lets get the current iov
if (this.source) |source| {
if (source == .file) {
var buf = source.file.iov.slice();
var buf = file.iov.slice();
return this.onRead(.{ .result = len }, buf[0..len], .progress);
}
},
else => |t| bun.unexpectedTag(t),
}
// ops we should not hit this lets fail with EPIPE
bun.assert(false);
return this.onRead(.{ .err = bun.sys.Error.fromCode(bun.C.E.PIPE, .read) }, "", .progress);
},
}
}
Expand All @@ -439,9 +432,11 @@ pub fn WindowsPipeReader(
source.setData(this);
const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024);
file.iov = uv.uv_buf_t.init(buf);
bun.Output.debug("startReading(0x{X}, {}, {d}) = {}", .{ @intFromPtr(this), bun.toFD(file.file), buf.len, this.flags.is_paused });
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| {
return .{ .err = err };
}
file.fs.data = this;
},
else => {
if (uv.uv_read_start(source.toStream(), &onStreamAlloc, @ptrCast(&onStreamRead)).toError(.open)) |err| {
Expand Down Expand Up @@ -473,11 +468,10 @@ pub fn WindowsPipeReader(
if (this.source) |source| {
switch (source) {
.sync_file, .file => |file| {
if (!this.flags.is_paused) {
// always cancel the current one
file.fs.cancel();
this.flags.is_paused = true;
}
// cancel the current one
file.fs.cancel();
this.flags.is_paused = true;

// always use close_fs here because we can have a operation in progress
file.close_fs.data = file;
_ = uv.uv_fs_close(uv.Loop.get(), &file.close_fs, file.file, onFileClose);
Expand All @@ -487,13 +481,13 @@ pub fn WindowsPipeReader(
pipe.close(onPipeClose);
},
.tty => |tty| {
tty.data = tty;
tty.close(onTTYClose);

if (tty == &Source.stdin_tty) {
Source.stdin_tty = undefined;
Source.stdin_tty_init = false;
}

tty.data = tty;
tty.close(onTTYClose);
},
}
this.source = null;
Expand Down Expand Up @@ -538,6 +532,7 @@ pub fn WindowsPipeReader(
switch (hasMore) {
.eof => {
// we call report EOF and close
_ = this.stopReading();
_ = onReadChunk(this, slice, hasMore);
close(this);
},
Expand Down
84 changes: 51 additions & 33 deletions src/output.zig
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ pub const Source = struct {
break :brk std.io.FixedBufferStream([]u8);
} else {
break :brk File;
// var stdout = std.io.getStdOut();
// return @TypeOf(std.io.bufferedWriter(stdout.writer()));
}
};
pub const BufferedStream: type = struct {
Expand Down Expand Up @@ -528,6 +526,9 @@ pub noinline fn println(comptime fmt: string, args: anytype) void {
/// Text automatically buffers
pub fn debug(comptime fmt: string, args: anytype) void {
if (comptime Environment.isRelease) return;
if (shouldLogPid()) |pid| {
print("[{d}] ", .{pid});
}
prettyErrorln("<d>DEBUG:<r> " ++ fmt, args);
flush();
}
Expand Down Expand Up @@ -555,6 +556,8 @@ pub noinline fn print(comptime fmt: string, args: anytype) callconv(std.builtin.
}
}

var debug_scoped_add_pid: union(enum) { yes: c_int, no, unknown } = .unknown;

/// Debug-only logs which should not appear in release mode
/// To enable a specific log at runtime, set the environment variable
/// BUN_DEBUG_${TAG} to 1
Expand Down Expand Up @@ -631,24 +634,31 @@ pub fn Scoped(comptime tag: anytype, comptime disabled: bool) type {
lock.lock();
defer lock.unlock();

if (Output.enable_ansi_colors_stdout and source_set and buffered_writer.unbuffered_writer.context.handle == writer().context.handle) {
out.print(comptime prettyFmt("<r><d>[" ++ tagname ++ "]<r> " ++ fmt, true), args) catch {
really_disable = true;
return;
};
buffered_writer.flush() catch {
really_disable = true;
return;
};
} else {
out.print(comptime prettyFmt("<r><d>[" ++ tagname ++ "]<r> " ++ fmt, false), args) catch {
really_disable = true;
return;
};
buffered_writer.flush() catch {
really_disable = true;
return;
};
switch (Output.enable_ansi_colors_stdout and source_set and buffered_writer.unbuffered_writer.context.handle == writer().context.handle) {
inline else => |colors| {
out.print(comptime prettyFmt("<r><d>", colors), .{}) catch {
really_disable = true;
return;
};
if (shouldLogPid()) |pid| {
out.print("[{d}] ", .{pid}) catch {
really_disable = true;
return;
};
}
out.print(comptime prettyFmt("[{s}]<r> ", colors), .{tagname}) catch {
really_disable = true;
return;
};
out.print(comptime prettyFmt(fmt, colors), args) catch {
really_disable = true;
return;
};
buffered_writer.flush() catch {
really_disable = true;
return;
};
},
}
}
};
Expand All @@ -658,6 +668,21 @@ pub fn scoped(comptime tag: anytype, comptime disabled: bool) LogFunction {
return Scoped(tag, disabled).log;
}

pub fn shouldLogPid() ?c_int {
if (debug_scoped_add_pid == .unknown) {
debug_scoped_add_pid = if (bun.getenvZ("BUN_DEBUG_ADD_PID")) |val|
if (bun.strings.eqlComptime(val, "1")) .{ .yes = getpid() } else .no
else
.no;
}

return switch (debug_scoped_add_pid) {
.yes => |pid| pid,
.no => null,
.unknown => unreachable,
};
}

// Valid "colors":
// <black>
// <blue>
Expand Down Expand Up @@ -970,9 +995,6 @@ pub fn initScopedDebugWriterAtStartup() void {

if (bun.getenvZ("BUN_DEBUG")) |path| {
if (path.len > 0 and !strings.eql(path, "0") and !strings.eql(path, "false")) {
if (std.fs.path.dirname(path)) |dir| {
std.fs.cwd().makePath(dir) catch {};
}

// do not use libuv through this code path, since it might not be initialized yet.
const pid = std.fmt.allocPrint(bun.default_allocator, "{d}", .{getpid()}) catch @panic("failed to allocate path");
Expand All @@ -981,16 +1003,12 @@ pub fn initScopedDebugWriterAtStartup() void {
const path_fmt = std.mem.replaceOwned(u8, bun.default_allocator, path, "{pid}", pid) catch @panic("failed to allocate path");
defer bun.default_allocator.free(path_fmt);

const fd = std.os.openat(
std.fs.cwd().fd,
path_fmt,
std.os.O.CREAT | std.os.O.WRONLY,
// on windows this is u0
if (Environment.isWindows) 0 else 0o644,
) catch |err_| {
Output.panic("Failed to open file for debug output: {s} ({s})", .{ @errorName(err_), path });
};
_ = bun.sys.ftruncate(bun.toFD(fd), 0); // windows
if (std.fs.path.dirname(path_fmt)) |dir|
std.fs.cwd().makePath(dir) catch {};

const fd = (std.fs.cwd().createFile(path_fmt, .{}) catch |err_| {
Output.panic("Failed to open file for debug output: {s} ({s})", .{ @errorName(err_), path_fmt });
}).handle;
ScopedDebugWriter.scoped_file_writer = File.from(fd).quietWriter();
return;
}
Expand Down
8 changes: 7 additions & 1 deletion test/integration/next-pages/test/dev-server-puppeteer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ async function main() {
await counter_root.$eval(".dec", x => (x as HTMLElement).click());
assert.strictEqual(await getCount(), "Count A: 1");

console.log("reloading");
const wait2 = waitForConsoleMessage(p, /counter a/);
p.reload({});
await waitForConsoleMessage(p, /counter a/);
await wait2;
console.log("reloaded");

assert.strictEqual(await p.$eval("code.font-bold", x => x.innerText), Bun.version);

Expand Down Expand Up @@ -108,6 +111,9 @@ async function main() {

try {
await main();
} catch (e) {
console.error(e);
process.exit(1);
} finally {
await b?.close?.();
}
3 changes: 2 additions & 1 deletion test/integration/next-pages/test/dev-server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,6 @@ test.skipIf(puppeteer_unsupported)(
// @ts-expect-error
timeout = undefined;
},
30000,
// debug builds are slow
Bun.version.endsWith("-debug") ? Infinity : 30000,
);