From 41b81c8b05057fbab1fc40cbce1b3dcd8de16860 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Arrufat?= Date: Mon, 2 Mar 2026 10:04:23 +0900 Subject: [PATCH] mcp: use io poll for stdin and integrate message loop Replaces blocking stdin reads with `std.io.poll` to allow macrotasks to run. Removes the stdout mutex as I/O is now serialized. --- src/mcp/Server.zig | 12 +++------- src/mcp/router.zig | 59 +++++++++++++++++++++++++++------------------- 2 files changed, 38 insertions(+), 33 deletions(-) diff --git a/src/mcp/Server.zig b/src/mcp/Server.zig index 2df4b792..3f2f259c 100644 --- a/src/mcp/Server.zig +++ b/src/mcp/Server.zig @@ -18,8 +18,6 @@ page: *lp.Page, is_running: std.atomic.Value(bool) = .init(false), -stdout_mutex: std.Thread.Mutex = .{}, - pub fn init(allocator: std.mem.Allocator, app: *App) !*Self { const self = try allocator.create(Self); errdefer allocator.destroy(self); @@ -55,13 +53,9 @@ pub fn deinit(self: *Self) void { self.allocator.destroy(self); } -pub fn sendResponse(self: *Self, response: anytype) !void { - self.stdout_mutex.lock(); - defer self.stdout_mutex.unlock(); - - var stdout_file = std.fs.File.stdout(); - var stdout_buf: [8192]u8 = undefined; - var stdout = stdout_file.writer(&stdout_buf); +pub fn sendResponse(_: *Self, response: anytype) !void { + var buffer: [8192]u8 = undefined; + var stdout = std.fs.File.stdout().writer(&buffer); try std.json.Stringify.value(response, .{ .emit_null_optional_fields = false }, &stdout.interface); try stdout.interface.writeByte('\n'); try stdout.interface.flush(); diff --git a/src/mcp/router.zig b/src/mcp/router.zig index 98fdccbb..a791ba58 100644 --- a/src/mcp/router.zig +++ b/src/mcp/router.zig @@ -9,40 +9,51 @@ const Server = @import("Server.zig"); const tools = @import("tools.zig"); pub fn processRequests(server: *Server) !void { - var stdin_file = std.fs.File.stdin(); - var stdin_buf: [8192]u8 = undefined; - var stdin = stdin_file.reader(&stdin_buf); - server.is_running.store(true, .release); + const Streams = enum { stdin }; + var poller = std.io.poll(server.allocator, Streams, .{ .stdin = std.fs.File.stdin() }); + defer poller.deinit(); + + const reader = poller.reader(.stdin); + var arena: std.heap.ArenaAllocator = .init(server.allocator); defer arena.deinit(); - var msg_buf = std.Io.Writer.Allocating.init(server.allocator); - defer msg_buf.deinit(); - while (server.is_running.load(.acquire)) { - msg_buf.clearRetainingCapacity(); - const n = try stdin.interface.streamDelimiterLimit(&msg_buf.writer, '\n', .limited(1024 * 1024 * 10)); + const ms_to_next_task = (try server.browser.runMacrotasks()) orelse 10_000; - var found_newline = true; - _ = stdin.interface.discardDelimiterInclusive('\n') catch |err| switch (err) { - error.EndOfStream => found_newline = false, - else => return err, - }; + // Poll until the next macrotask is scheduled. This will block if no data is available. + const poll_ok = try poller.pollTimeout(ms_to_next_task * std.time.ns_per_ms); - if (n == 0 and !found_newline) break; + while (true) { + const buffered = reader.buffered(); + if (std.mem.indexOfScalar(u8, buffered, '\n')) |idx| { + const line = buffered[0..idx]; + if (line.len > 0) { + handleMessage(server, arena.allocator(), line) catch |err| { + log.warn(.mcp, "Error processing message", .{ .err = err }); + }; + _ = arena.reset(.{ .retain_with_limit = 32 * 1024 }); + } + reader.toss(idx + 1); + } else { + break; + } + } - const msg = msg_buf.written(); - if (msg.len == 0) continue; + if (!poll_ok) { + // Check if we have any data left in the buffer that didn't end with a newline + const buffered = reader.buffered(); + if (buffered.len > 0) { + handleMessage(server, arena.allocator(), buffered) catch |err| { + log.warn(.mcp, "Error processing last message", .{ .err = err }); + }; + } + break; + } - handleMessage(server, arena.allocator(), msg) catch |err| { - log.warn(.mcp, "Error processing message", .{ .err = err }); - // We should ideally send a parse error response back, but it's hard to extract the ID if parsing failed entirely. - }; - - // 32KB: avoid reallocations while keeping memory footprint low. - _ = arena.reset(.{ .retain_with_limit = 32 * 1024 }); + server.browser.runMessageLoop(); } }