Skip to content

Commit

Permalink
Add EventLoop#run(blocking) and EventLoop#interrupt (#14568)
Browse files Browse the repository at this point in the history
Co-authored-by: Johannes Müller <[email protected]>
  • Loading branch information
ysbaddaden and straight-shoota committed May 22, 2024
1 parent b563cba commit 8817739
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class Crystal::Scheduler
resume(runnable) unless runnable == @thread.current_fiber
break
else
@event_loop.run_once
@event_loop.run(blocking: true)
end
end
end
Expand Down
24 changes: 22 additions & 2 deletions src/crystal/system/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,28 @@ abstract class Crystal::EventLoop
Crystal::Scheduler.event_loop
end

# Runs the event loop.
abstract def run_once : Nil
# Runs the loop.
#
# Returns immediately if events are activable. Set `blocking` to false to
# return immediately if there are no activable events; set it to true to wait
# for activable events, which will block the current thread until then.
#
# Returns `true` on normal returns (e.g. has activated events, has pending
# events but blocking was false) and `false` when there are no registered
# events.
abstract def run(blocking : Bool) : Bool

# Tells a blocking run loop to no longer wait for events to activate. It may
# for example enqueue a NOOP event with an immediate (or past) timeout. Having
# activated an event, the loop shall return, allowing the blocked thread to
# continue.
#
# Should be a NOOP when the loop isn't running or is running in a nonblocking
# mode.
#
# NOTE: we assume that multiple threads won't run the event loop at the same
# time in parallel, but this assumption may change in the future!
abstract def interrupt : Nil

# Create a new resume event for a fiber.
abstract def create_resume_event(fiber : Fiber) : Event
Expand Down
17 changes: 11 additions & 6 deletions src/crystal/system/unix/event_libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,23 @@ module Crystal::LibEvent
Crystal::LibEvent::Event.new(event)
end

def run_loop : Nil
LibEvent2.event_base_loop(@base, LibEvent2::EventLoopFlags::None)
end

def run_once : Nil
LibEvent2.event_base_loop(@base, LibEvent2::EventLoopFlags::Once)
# NOTE: may return `true` even if no event has been triggered (e.g.
# nonblocking), but `false` means that nothing was processed.
def loop(once : Bool, nonblock : Bool) : Bool
flags = LibEvent2::EventLoopFlags::None
flags |= LibEvent2::EventLoopFlags::Once if once
flags |= LibEvent2::EventLoopFlags::NonBlock if nonblock
LibEvent2.event_base_loop(@base, flags) == 0
end

def loop_break : Nil
LibEvent2.event_base_loopbreak(@base)
end

def loop_exit : Nil
LibEvent2.event_base_loopexit(@base, nil)
end

def new_dns_base(init = true)
DnsBase.new LibEvent2.evdns_base_new(@base, init ? 1 : 0)
end
Expand Down
9 changes: 6 additions & 3 deletions src/crystal/system/unix/event_loop_libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ class Crystal::LibEvent::EventLoop < Crystal::EventLoop
end
{% end %}

# Runs the event loop.
def run_once : Nil
event_base.run_once
def run(blocking : Bool) : Bool
event_base.loop(once: true, nonblock: !blocking)
end

def interrupt : Nil
event_base.loop_exit
end

# Create a new resume event for a fiber.
Expand Down
1 change: 1 addition & 0 deletions src/crystal/system/unix/lib_event2.cr
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ lib LibEvent2
fun event_base_dispatch(eb : EventBase) : Int
fun event_base_loop(eb : EventBase, flags : EventLoopFlags) : Int
fun event_base_loopbreak(eb : EventBase) : Int
fun event_base_loopexit(EventBase, LibC::Timeval*) : LibC::Int
fun event_set_log_callback(callback : (Int, UInt8*) -> Nil)
fun event_enable_debug_mode
fun event_reinit(eb : EventBase) : Int
Expand Down
8 changes: 6 additions & 2 deletions src/crystal/system/wasi/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ end
# :nodoc:
class Crystal::Wasi::EventLoop < Crystal::EventLoop
# Runs the event loop.
def run_once : Nil
raise NotImplementedError.new("Crystal::Wasi::EventLoop.run_once")
def run(blocking : Bool) : Bool
raise NotImplementedError.new("Crystal::Wasi::EventLoop.run")
end

def interrupt : Nil
raise NotImplementedError.new("Crystal::Wasi::EventLoop.interrupt")
end

# Create a new resume event for a fiber.
Expand Down
62 changes: 49 additions & 13 deletions src/crystal/system/win32/event_loop_iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop
# This is a list of resume and timeout events managed outside of IOCP.
@queue = Deque(Crystal::Iocp::Event).new

@lock = Crystal::SpinLock.new
@interrupted = Atomic(Bool).new(false)
@blocked_thread = Atomic(Thread?).new(nil)

# Returns the base IO Completion Port
getter iocp : LibC::HANDLE do
create_completion_port(LibC::INVALID_HANDLE_VALUE, nil)
Expand All @@ -37,33 +41,49 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop

# Runs the event loop and enqueues the fiber for the next upcoming event or
# completion.
def run_once : Nil
def run(blocking : Bool) : Bool
# Pull the next upcoming event from the event queue. This determines the
# timeout for waiting on the completion port.
# OPTIMIZE: Implement @queue as a priority queue in order to avoid this
# explicit search for the lowest value and dequeue more efficient.
next_event = @queue.min_by?(&.wake_at)

unless next_event
Crystal::System.print_error "Warning: No runnables in scheduler. Exiting program.\n"
::exit
end
# no registered events: nothing to wait for
return false unless next_event

now = Time.monotonic

if next_event.wake_at > now
wait_time = next_event.wake_at - now
# There is no event ready to wake. So we wait for completions with a
# timeout for the next event wake time.
# There is no event ready to wake. We wait for completions until the next
# event wake time, unless nonblocking or already interrupted (timeout
# immediately).
if blocking
@lock.sync do
if @interrupted.get(:acquire)
blocking = false
else
# memorize the blocked thread (so we can alert it)
@blocked_thread.set(Thread.current, :release)
end
end
end

timed_out = IO::Overlapped.wait_queued_completions(wait_time.total_milliseconds) do |fiber|
wait_time = blocking ? (next_event.wake_at - now).total_milliseconds : 0
timed_out = IO::Overlapped.wait_queued_completions(wait_time, alertable: blocking) do |fiber|
# This block may run multiple times. Every single fiber gets enqueued.
fiber.enqueue
end

# If the wait for completion timed out we've reached the wake time and
# continue with waking `next_event`.
return unless timed_out
@blocked_thread.set(nil, :release)
@interrupted.set(false, :release)

# The wait for completion enqueued events.
return true unless timed_out

# Wait for completion timed out but it may have been interrupted or we ask
# for immediate timeout (nonblocking), so we check for the next event
# readyness again:
return false if next_event.wake_at > Time.monotonic
end

# next_event gets activated because its wake time is passed, either from the
Expand All @@ -81,7 +101,7 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop
# This would avoid the scheduler needing to looking at runnable again just
# to notice it's still empty. The lock involved there should typically be
# uncontested though, so it's probably not a big deal.
return if fiber.dead?
return false if fiber.dead?

# A timeout event needs special handling because it does not necessarily
# means to resume the fiber directly, in case a different select branch
Expand All @@ -92,6 +112,22 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop
else
fiber.enqueue
end

# We enqueued a fiber.
true
end

def interrupt : Nil
thread = nil

@lock.sync do
@interrupted.set(true)
thread = @blocked_thread.swap(nil, :acquire)
end
return unless thread

# alert the thread to interrupt GetQueuedCompletionStatusEx
LibC.QueueUserAPC(->(ptr : LibC::ULONG_PTR) {}, thread, LibC::ULONG_PTR.new(0))
end

def enqueue(event : Crystal::Iocp::Event)
Expand Down
6 changes: 4 additions & 2 deletions src/io/overlapped.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ module IO::Overlapped
property fiber : Fiber?
end

def self.wait_queued_completions(timeout, &)
def self.wait_queued_completions(timeout, alertable = false, &)
overlapped_entries = uninitialized LibC::OVERLAPPED_ENTRY[1]

if timeout > UInt64::MAX
timeout = LibC::INFINITE
else
timeout = timeout.to_u64
end
result = LibC.GetQueuedCompletionStatusEx(Crystal::EventLoop.current.iocp, overlapped_entries, overlapped_entries.size, out removed, timeout, false)
result = LibC.GetQueuedCompletionStatusEx(Crystal::EventLoop.current.iocp, overlapped_entries, overlapped_entries.size, out removed, timeout, alertable)
if result == 0
error = WinError.value
if timeout && error.wait_timeout?
return true
elsif alertable && error.value == LibC::WAIT_IO_COMPLETION
return true
else
raise IO::Error.from_os_error("GetQueuedCompletionStatusEx", error)
end
Expand Down
1 change: 1 addition & 0 deletions src/lib_c/x86_64-windows-msvc/c/processthreadsapi.cr
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ lib LibC
fun GetProcessTimes(hProcess : HANDLE, lpCreationTime : FILETIME*, lpExitTime : FILETIME*,
lpKernelTime : FILETIME*, lpUserTime : FILETIME*) : BOOL
fun SwitchToThread : BOOL
fun QueueUserAPC(pfnAPC : PAPCFUNC, hThread : HANDLE, dwData : ULONG_PTR) : DWORD

PROCESS_QUERY_INFORMATION = 0x0400
end
2 changes: 2 additions & 0 deletions src/lib_c/x86_64-windows-msvc/c/winnt.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ lib LibC
alias HANDLE = Void*
alias HMODULE = Void*

alias PAPCFUNC = ULONG_PTR ->

INVALID_FILE_ATTRIBUTES = DWORD.new!(-1)
FILE_ATTRIBUTE_DIRECTORY = 0x10
FILE_ATTRIBUTE_HIDDEN = 0x2
Expand Down

0 comments on commit 8817739

Please sign in to comment.