mcp: use io poll for stdin and integrate message loop

Replaces blocking stdin reads with `std.io.poll` to allow macrotasks to
run. Removes the stdout mutex as I/O is now serialized.
This commit is contained in:
Adrià Arrufat
2026-03-02 10:04:23 +09:00
parent 42b5e32473
commit 41b81c8b05
2 changed files with 38 additions and 33 deletions

View File

@@ -18,8 +18,6 @@ page: *lp.Page,
is_running: std.atomic.Value(bool) = .init(false),
stdout_mutex: std.Thread.Mutex = .{},
pub fn init(allocator: std.mem.Allocator, app: *App) !*Self {
const self = try allocator.create(Self);
errdefer allocator.destroy(self);
@@ -55,13 +53,9 @@ pub fn deinit(self: *Self) void {
self.allocator.destroy(self);
}
pub fn sendResponse(self: *Self, response: anytype) !void {
self.stdout_mutex.lock();
defer self.stdout_mutex.unlock();
var stdout_file = std.fs.File.stdout();
var stdout_buf: [8192]u8 = undefined;
var stdout = stdout_file.writer(&stdout_buf);
pub fn sendResponse(_: *Self, response: anytype) !void {
var buffer: [8192]u8 = undefined;
var stdout = std.fs.File.stdout().writer(&buffer);
try std.json.Stringify.value(response, .{ .emit_null_optional_fields = false }, &stdout.interface);
try stdout.interface.writeByte('\n');
try stdout.interface.flush();

View File

@@ -9,40 +9,51 @@ const Server = @import("Server.zig");
const tools = @import("tools.zig");
pub fn processRequests(server: *Server) !void {
var stdin_file = std.fs.File.stdin();
var stdin_buf: [8192]u8 = undefined;
var stdin = stdin_file.reader(&stdin_buf);
server.is_running.store(true, .release);
const Streams = enum { stdin };
var poller = std.io.poll(server.allocator, Streams, .{ .stdin = std.fs.File.stdin() });
defer poller.deinit();
const reader = poller.reader(.stdin);
var arena: std.heap.ArenaAllocator = .init(server.allocator);
defer arena.deinit();
var msg_buf = std.Io.Writer.Allocating.init(server.allocator);
defer msg_buf.deinit();
while (server.is_running.load(.acquire)) {
msg_buf.clearRetainingCapacity();
const n = try stdin.interface.streamDelimiterLimit(&msg_buf.writer, '\n', .limited(1024 * 1024 * 10));
const ms_to_next_task = (try server.browser.runMacrotasks()) orelse 10_000;
var found_newline = true;
_ = stdin.interface.discardDelimiterInclusive('\n') catch |err| switch (err) {
error.EndOfStream => found_newline = false,
else => return err,
};
// Poll until the next macrotask is scheduled. This will block if no data is available.
const poll_ok = try poller.pollTimeout(ms_to_next_task * std.time.ns_per_ms);
if (n == 0 and !found_newline) break;
while (true) {
const buffered = reader.buffered();
if (std.mem.indexOfScalar(u8, buffered, '\n')) |idx| {
const line = buffered[0..idx];
if (line.len > 0) {
handleMessage(server, arena.allocator(), line) catch |err| {
log.warn(.mcp, "Error processing message", .{ .err = err });
};
_ = arena.reset(.{ .retain_with_limit = 32 * 1024 });
}
reader.toss(idx + 1);
} else {
break;
}
}
const msg = msg_buf.written();
if (msg.len == 0) continue;
if (!poll_ok) {
// Check if we have any data left in the buffer that didn't end with a newline
const buffered = reader.buffered();
if (buffered.len > 0) {
handleMessage(server, arena.allocator(), buffered) catch |err| {
log.warn(.mcp, "Error processing last message", .{ .err = err });
};
}
break;
}
handleMessage(server, arena.allocator(), msg) catch |err| {
log.warn(.mcp, "Error processing message", .{ .err = err });
// We should ideally send a parse error response back, but it's hard to extract the ID if parsing failed entirely.
};
// 32KB: avoid reallocations while keeping memory footprint low.
_ = arena.reset(.{ .retain_with_limit = 32 * 1024 });
server.browser.runMessageLoop();
}
}