From 6897d72c3ee8dc02ee2c1419c33dbc32db624e2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Arrufat?= Date: Sat, 28 Feb 2026 21:26:51 +0900 Subject: [PATCH] mcp: simplify request processing to single-threaded --- src/main.zig | 3 +- src/mcp/Server.zig | 72 +--------------------------------------------- src/mcp/router.zig | 32 ++++++++++++++------- 3 files changed, 23 insertions(+), 84 deletions(-) diff --git a/src/main.zig b/src/main.zig index d312eb81..1d3b51fb 100644 --- a/src/main.zig +++ b/src/main.zig @@ -138,8 +138,7 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { var mcp_server = try lp.mcp.Server.init(allocator, app); defer mcp_server.deinit(); - try mcp_server.start(); - lp.mcp.router.processRequests(mcp_server); + try lp.mcp.router.processRequests(mcp_server); }, else => unreachable, } diff --git a/src/mcp/Server.zig b/src/mcp/Server.zig index 08e6b978..c8843b5a 100644 --- a/src/mcp/Server.zig +++ b/src/mcp/Server.zig @@ -15,11 +15,6 @@ browser: *lp.Browser, session: *lp.Session, page: *lp.Page, -io_thread: ?std.Thread = null, -queue_mutex: std.Thread.Mutex = .{}, -queue_condition: std.Thread.Condition = .{}, -message_queue: std.ArrayListUnmanaged([]const u8) = .empty, - is_running: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), stdout_mutex: std.Thread.Mutex = .{}, @@ -30,7 +25,6 @@ pub fn init(allocator: std.mem.Allocator, app: *App) !*Self { self.allocator = allocator; self.app = app; - self.message_queue = .empty; self.http_client = try app.http.createClient(allocator); errdefer self.http_client.deinit(); @@ -50,14 +44,7 @@ pub fn init(allocator: std.mem.Allocator, app: *App) !*Self { } pub fn deinit(self: *Self) void { - self.stop(); - if (self.io_thread) |*thread| { - thread.join(); - } - for (self.message_queue.items) |msg| { - self.allocator.free(msg); - } - self.message_queue.deinit(self.allocator); + self.is_running.store(false, .seq_cst); self.browser.deinit(); self.allocator.destroy(self.browser); @@ -67,63 +54,6 @@ pub fn deinit(self: *Self) void { self.allocator.destroy(self); } -pub fn start(self: *Self) !void { - self.is_running.store(true, .seq_cst); - self.io_thread = try std.Thread.spawn(.{}, ioWorker, .{self}); -} - -pub fn stop(self: *Self) void { - self.is_running.store(false, .seq_cst); - self.queue_mutex.lock(); - self.queue_condition.signal(); - self.queue_mutex.unlock(); -} - -fn ioWorker(self: *Self) void { - var stdin_file = std.fs.File.stdin(); - var stdin_buf: [8192]u8 = undefined; - var stdin = stdin_file.reader(&stdin_buf); - - while (self.is_running.load(.seq_cst)) { - const msg_or_err = stdin.interface.adaptToOldInterface().readUntilDelimiterAlloc(self.allocator, '\n', 1024 * 1024 * 10); - if (msg_or_err) |msg| { - if (msg.len == 0) { - self.allocator.free(msg); - continue; - } - - self.queue_mutex.lock(); - self.message_queue.append(self.allocator, msg) catch |err| { - lp.log.err(.app, "MCP Queue failed", .{ .err = err }); - self.allocator.free(msg); - }; - self.queue_mutex.unlock(); - self.queue_condition.signal(); - } else |err| { - if (err == error.EndOfStream) { - self.stop(); - break; - } - lp.log.err(.app, "MCP IO Error", .{ .err = err }); - std.Thread.sleep(100 * std.time.ns_per_ms); - } - } -} - -pub fn getNextMessage(self: *Self) ?[]const u8 { - self.queue_mutex.lock(); - defer self.queue_mutex.unlock(); - - while (self.message_queue.items.len == 0 and self.is_running.load(.seq_cst)) { - self.queue_condition.wait(&self.queue_mutex); - } - - if (self.message_queue.items.len > 0) { - return self.message_queue.orderedRemove(0); - } - return null; -} - pub fn sendResponse(self: *Self, response: anytype) !void { self.stdout_mutex.lock(); defer self.stdout_mutex.unlock(); diff --git a/src/mcp/router.zig b/src/mcp/router.zig index d3d5071a..b07be916 100644 --- a/src/mcp/router.zig +++ b/src/mcp/router.zig @@ -8,20 +8,30 @@ const resources = @import("resources.zig"); const Server = @import("Server.zig"); const tools = @import("tools.zig"); -pub fn processRequests(server: *Server) void { +pub fn processRequests(server: *Server) !void { + var stdin_file = std.fs.File.stdin(); + var stdin_buf: [8192]u8 = undefined; + var stdin = stdin_file.reader(&stdin_buf); + + server.is_running.store(true, .seq_cst); + while (server.is_running.load(.seq_cst)) { - if (server.getNextMessage()) |msg| { - defer server.allocator.free(msg); + const msg = stdin.interface.adaptToOldInterface().readUntilDelimiterAlloc(server.allocator, '\n', 1024 * 1024 * 10) catch |err| { + if (err == error.EndOfStream) break; + return err; + }; + defer server.allocator.free(msg); - // Critical: Per-request Arena - var arena = std.heap.ArenaAllocator.init(server.allocator); - defer arena.deinit(); + if (msg.len == 0) continue; - handleMessage(server, arena.allocator(), msg) catch |err| { - log.err(.app, "MCP Error processing message", .{ .err = err }); - // We should ideally send a parse error response back, but it's hard to extract the ID if parsing failed entirely. - }; - } + // Critical: Per-request Arena + var arena = std.heap.ArenaAllocator.init(server.allocator); + defer arena.deinit(); + + handleMessage(server, arena.allocator(), msg) catch |err| { + log.err(.app, "MCP Error processing message", .{ .err = err }); + // We should ideally send a parse error response back, but it's hard to extract the ID if parsing failed entirely. + }; } }