-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Fix kqueue readable & writable check #23987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughImplements kqueue event coalescing and dispatch changes: sorts and merges kevent entries by udata, dispatches coalesced events, and adjusts kqueue change flags to use EV_ONESHOT for certain write/FIN detection scenarios; EPOLL paths remain unchanged. (49 words) Changes
Suggested reviewers
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (1 passed)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
packages/bun-usockets/src/eventing/epoll_kqueue.c(2 hunks)
🔇 Additional comments (2)
packages/bun-usockets/src/eventing/epoll_kqueue.c (2)
363-363: LGTM: EV_ONESHOT is appropriate for FIN detection.Using
EV_ONESHOTfor the writable filter when neither readable nor writable is correct. Since this is specifically for detecting FIN/EOF (as noted in the comment at line 362), a one-shot event is sufficient - you only need to be notified once that the connection is closed.
367-367: LGTM: EV_ONESHOT for writable events is standard practice.Making writable events one-shot aligns with the PR objective to avoid event loop spam. The typical pattern is:
- Register for writable with
EV_ONESHOT- Event fires once when socket becomes writable
- Write until
EAGAIN/EWOULDBLOCK- Re-register via
us_poll_changeif more data to writeThis is more efficient than continuous writable notifications. Ensure that callers properly re-register for writable events when they have pending data and receive
EAGAIN.
On kqueue (macOS), a single file descriptor can have multiple events (EVFILT_READ and EVFILT_WRITE) which arrive as separate kevent structures. This could cause the same poll to be dispatched multiple times per event loop tick. This change implements event coalescing: 1. Sort events by udata (poll pointer) using qsort 2. Coalesce adjacent events with matching udata into a single logical event 3. Mark duplicate events as skipped 4. Dispatch only the coalesced events This ensures each poll is dispatched at most once per event loop iteration, with all of its ready events (readable/writable/error/eof) combined. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
packages/bun-usockets/src/eventing/epoll_kqueue.c(7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Format
🔇 Additional comments (4)
packages/bun-usockets/src/eventing/epoll_kqueue.c (4)
27-45: LGTM: Clean coalescing infrastructure.The
KQueueEventstructure and comparison function are well-designed for the event coalescing approach. The bit fields efficiently pack the event state, and the comparison function correctly sorts byudatato group events for the same file descriptor.
275-280: LGTM: Correct skip logic for coalesced events.This correctly skips duplicate events that were merged during coalescing, ensuring each file descriptor is dispatched only once per iteration.
293-299: LGTM: Correctly uses coalesced event data.The code now properly reads from the coalesced event structure, which correctly merged readable and writable flags using equality comparisons (
filter == EVFILT_READ) rather than bitwise AND. This addresses the concern raised in the previous review about incorrect filter checks.
469-473: Based on the web search results and the script output, I can now generate the rewritten review comment with confidence.Verify EV_ONESHOT semantics and test impact for writable socket events.
EV_ONESHOT causes the event to return only the first occurrence of the filter being triggered; after the user retrieves the event from the kqueue, it is deleted. For writable socket events, this means:
- After the first writable notification fires, the filter is automatically deleted
- Applications must explicitly re-add the filter to receive further notifications
- With EV_ONESHOT, no matter how much data is sent, only a single notification is received; the notification must be re-armed manually.
Observations from codebase:
- Line 469: Adds EV_ONESHOT for writable when detecting FIN (neither readable nor writable case)
- Line 473: Applies EV_ONESHOT to ALL writable event changes via conditional
- Codebase patterns show
us_poll_change()is called after write failures (socket.c:299, 373, 410; udp.c:64), which would re-enable the filter- However, 11 test failures across HTTP, fetch, and bake/dev tests suggest edge cases not covered
Concerns:
The change assumes all write scenarios follow the pattern: write fails → call us_poll_change() to re-enable. However, this may break scenarios expecting persistent writable notifications for streaming writes or applications that don't explicitly call
us_poll_changeafter partial writes. By default kqueue uses level-triggered behavior (returns as long as condition holds); without EV_CLEAR, EVFILT_WRITE would continuously notify when socket is writable. EV_ONESHOT fundamentally changes this to edge-triggered behavior requiring manual re-arming.Verify that all code paths calling write operations explicitly re-enable the writable filter when needed, or consider using EV_DISPATCH (which disables but doesn't delete the filter) as a less disruptive alternative.
| /* For kqueue, coalesce events by udata to handle multiple filters per fd */ | ||
| struct KQueueEvent coalesced_events[1024]; | ||
| memset(coalesced_events, 0, sizeof(coalesced_events)); | ||
|
|
||
| /* First pass: sort events by udata */ | ||
| if (loop->num_ready_polls > 0) { | ||
| qsort(loop->ready_polls, loop->num_ready_polls, sizeof(struct kevent64_s), kqueue_event_compare); | ||
| } | ||
|
|
||
| /* Second pass: coalesce events and fill KQueueEvent struct */ | ||
| uint64_t prev_udata = 0; | ||
| int prev_index = -1; | ||
| for (int i = 0; i < loop->num_ready_polls; i++) { | ||
| const struct kevent64_s* current_kevent = &loop->ready_polls[i]; | ||
| const uint64_t current_udata = current_kevent->udata; | ||
| const int16_t filter = current_kevent->filter; | ||
| const uint16_t flags = current_kevent->flags; | ||
| const uint32_t fflags = current_kevent->fflags; | ||
|
|
||
| int target_index = i; | ||
|
|
||
| /* If this udata matches previous, coalesce into previous event */ | ||
| if (i > 0 && current_udata == prev_udata && prev_index >= 0) { | ||
| target_index = prev_index; | ||
| coalesced_events[i].skipped = 1; | ||
| } else { | ||
| prev_index = i; | ||
| prev_udata = current_udata; | ||
| } | ||
|
|
||
| /* Accumulate event flags */ | ||
| if (filter == EVFILT_READ) { | ||
| coalesced_events[target_index].readable = 1; | ||
| } else if (filter == EVFILT_WRITE) { | ||
| coalesced_events[target_index].writable = 1; | ||
| } | ||
|
|
||
| if (flags & EV_ERROR) { | ||
| coalesced_events[target_index].error = 1; | ||
| } | ||
| if (flags & EV_EOF) { | ||
| coalesced_events[target_index].eof = 1; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Remove unused variable fflags.
The variable fflags is declared at line 244 but never used in the coalescing loop. This will trigger compiler warnings.
Apply this diff to remove the unused variable:
const struct kevent64_s* current_kevent = &loop->ready_polls[i];
const uint64_t current_udata = current_kevent->udata;
const int16_t filter = current_kevent->filter;
const uint16_t flags = current_kevent->flags;
- const uint32_t fflags = current_kevent->fflags;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /* For kqueue, coalesce events by udata to handle multiple filters per fd */ | |
| struct KQueueEvent coalesced_events[1024]; | |
| memset(coalesced_events, 0, sizeof(coalesced_events)); | |
| /* First pass: sort events by udata */ | |
| if (loop->num_ready_polls > 0) { | |
| qsort(loop->ready_polls, loop->num_ready_polls, sizeof(struct kevent64_s), kqueue_event_compare); | |
| } | |
| /* Second pass: coalesce events and fill KQueueEvent struct */ | |
| uint64_t prev_udata = 0; | |
| int prev_index = -1; | |
| for (int i = 0; i < loop->num_ready_polls; i++) { | |
| const struct kevent64_s* current_kevent = &loop->ready_polls[i]; | |
| const uint64_t current_udata = current_kevent->udata; | |
| const int16_t filter = current_kevent->filter; | |
| const uint16_t flags = current_kevent->flags; | |
| const uint32_t fflags = current_kevent->fflags; | |
| int target_index = i; | |
| /* If this udata matches previous, coalesce into previous event */ | |
| if (i > 0 && current_udata == prev_udata && prev_index >= 0) { | |
| target_index = prev_index; | |
| coalesced_events[i].skipped = 1; | |
| } else { | |
| prev_index = i; | |
| prev_udata = current_udata; | |
| } | |
| /* Accumulate event flags */ | |
| if (filter == EVFILT_READ) { | |
| coalesced_events[target_index].readable = 1; | |
| } else if (filter == EVFILT_WRITE) { | |
| coalesced_events[target_index].writable = 1; | |
| } | |
| if (flags & EV_ERROR) { | |
| coalesced_events[target_index].error = 1; | |
| } | |
| if (flags & EV_EOF) { | |
| coalesced_events[target_index].eof = 1; | |
| } | |
| } | |
| /* For kqueue, coalesce events by udata to handle multiple filters per fd */ | |
| struct KQueueEvent coalesced_events[1024]; | |
| memset(coalesced_events, 0, sizeof(coalesced_events)); | |
| /* First pass: sort events by udata */ | |
| if (loop->num_ready_polls > 0) { | |
| qsort(loop->ready_polls, loop->num_ready_polls, sizeof(struct kevent64_s), kqueue_event_compare); | |
| } | |
| /* Second pass: coalesce events and fill KQueueEvent struct */ | |
| uint64_t prev_udata = 0; | |
| int prev_index = -1; | |
| for (int i = 0; i < loop->num_ready_polls; i++) { | |
| const struct kevent64_s* current_kevent = &loop->ready_polls[i]; | |
| const uint64_t current_udata = current_kevent->udata; | |
| const int16_t filter = current_kevent->filter; | |
| const uint16_t flags = current_kevent->flags; | |
| int target_index = i; | |
| /* If this udata matches previous, coalesce into previous event */ | |
| if (i > 0 && current_udata == prev_udata && prev_index >= 0) { | |
| target_index = prev_index; | |
| coalesced_events[i].skipped = 1; | |
| } else { | |
| prev_index = i; | |
| prev_udata = current_udata; | |
| } | |
| /* Accumulate event flags */ | |
| if (filter == EVFILT_READ) { | |
| coalesced_events[target_index].readable = 1; | |
| } else if (filter == EVFILT_WRITE) { | |
| coalesced_events[target_index].writable = 1; | |
| } | |
| if (flags & EV_ERROR) { | |
| coalesced_events[target_index].error = 1; | |
| } | |
| if (flags & EV_EOF) { | |
| coalesced_events[target_index].eof = 1; | |
| } | |
| } |
🤖 Prompt for AI Agents
In packages/bun-usockets/src/eventing/epoll_kqueue.c around lines 227 to 270,
the local variable `fflags` is declared but never used inside the coalescing
loop which causes compiler warnings; remove the `const uint32_t fflags =
current_kevent->fflags;` declaration (and any future unused references) so only
needed variables are declared, leaving the rest of the loop logic unchanged.
| /* For kqueue, coalesce events by udata to handle multiple filters per fd */ | ||
| struct KQueueEvent coalesced_events[1024]; | ||
| memset(coalesced_events, 0, sizeof(coalesced_events)); | ||
|
|
||
| /* First pass: sort events by udata */ | ||
| if (loop->num_ready_polls > 0) { | ||
| qsort(loop->ready_polls, loop->num_ready_polls, sizeof(struct kevent64_s), kqueue_event_compare); | ||
| } | ||
|
|
||
| /* Second pass: coalesce events and fill KQueueEvent struct */ | ||
| uint64_t prev_udata = 0; | ||
| int prev_index = -1; | ||
| for (int i = 0; i < loop->num_ready_polls; i++) { | ||
| const struct kevent64_s* current_kevent = &loop->ready_polls[i]; | ||
| const uint64_t current_udata = current_kevent->udata; | ||
| const int16_t filter = current_kevent->filter; | ||
| const uint16_t flags = current_kevent->flags; | ||
| const uint32_t fflags = current_kevent->fflags; | ||
|
|
||
| int target_index = i; | ||
|
|
||
| /* If this udata matches previous, coalesce into previous event */ | ||
| if (i > 0 && current_udata == prev_udata && prev_index >= 0) { | ||
| target_index = prev_index; | ||
| coalesced_events[i].skipped = 1; | ||
| } else { | ||
| prev_index = i; | ||
| prev_udata = current_udata; | ||
| } | ||
|
|
||
| /* Accumulate event flags */ | ||
| if (filter == EVFILT_READ) { | ||
| coalesced_events[target_index].readable = 1; | ||
| } else if (filter == EVFILT_WRITE) { | ||
| coalesced_events[target_index].writable = 1; | ||
| } | ||
|
|
||
| if (flags & EV_ERROR) { | ||
| coalesced_events[target_index].error = 1; | ||
| } | ||
| if (flags & EV_EOF) { | ||
| coalesced_events[target_index].eof = 1; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Remove unused variable and consider extracting coalescing logic.
The variable fflags at line 359 is unused (same issue as line 244). Additionally, the coalescing logic is duplicated between us_loop_run and us_loop_run_bun_tick. Consider extracting this into a helper function to reduce code duplication and improve maintainability.
Apply this diff to remove the unused variable:
const struct kevent64_s* current_kevent = &loop->ready_polls[i];
const uint64_t current_udata = current_kevent->udata;
const int16_t filter = current_kevent->filter;
const uint16_t flags = current_kevent->flags;
- const uint32_t fflags = current_kevent->fflags;Optional refactoring: Extract the ~60 lines of coalescing logic into a helper function like:
static void coalesce_kqueue_events(struct kevent64_s *ready_polls,
int num_ready_polls,
struct KQueueEvent *coalesced_events);🤖 Prompt for AI Agents
In packages/bun-usockets/src/eventing/epoll_kqueue.c around lines 342 to 385,
remove the unused local variable `fflags` (declared at line ~359) and replace
its declaration/usage with nothing since it is not referenced; additionally
factor the ~60 lines of repeated coalescing logic (sorting, looping over
ready_polls and populating KQueueEvent entries) into a new static helper such as
`static void coalesce_kqueue_events(struct kevent64_s *ready_polls, int
num_ready_polls, struct KQueueEvent *coalesced_events)` and call it from both
`us_loop_run` and `us_loop_run_bun_tick` to eliminate duplication and improve
maintainability.
What does this PR do?
How did you verify your code works?