From 78edf6d324409c19c8b578192d95d65365299ed6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Arrufat?= Date: Mon, 2 Mar 2026 21:25:07 +0900 Subject: [PATCH] mcp: simplify I/O architecture and remove test harness --- src/main.zig | 10 +++- src/mcp.zig | 1 - src/mcp/Server.zig | 92 +++++++++---------------------- src/mcp/router.zig | 89 +++++++++++++----------------- src/mcp/testing.zig | 130 -------------------------------------------- 5 files changed, 72 insertions(+), 250 deletions(-) delete mode 100644 src/mcp/testing.zig diff --git a/src/main.zig b/src/main.zig index 97883de4..f06cf41e 100644 --- a/src/main.zig +++ b/src/main.zig @@ -136,10 +136,16 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { log.opts.format = .logfmt; - var mcp_server = try lp.mcp.Server.init(allocator, app, std.fs.File.stdout()); + var stdout_buf: [4096]u8 = undefined; + var stdout = std.fs.File.stdout().writer(&stdout_buf); + + var mcp_server = try lp.mcp.Server.init(allocator, app, &stdout.interface); defer mcp_server.deinit(); - try lp.mcp.router.processRequests(mcp_server, std.fs.File.stdin()); + var stdin_buf: [4096]u8 = undefined; + var stdin = std.fs.File.stdin().reader(&stdin_buf); + + try lp.mcp.router.processRequests(mcp_server, &stdin.interface); }, else => unreachable, } diff --git a/src/mcp.zig b/src/mcp.zig index 9af6fd2d..41998f5a 100644 --- a/src/mcp.zig +++ b/src/mcp.zig @@ -1,4 +1,3 @@ pub const Server = @import("mcp/Server.zig"); pub const protocol = @import("mcp/protocol.zig"); pub const router = @import("mcp/router.zig"); -pub const testing = @import("mcp/testing.zig"); diff --git a/src/mcp/Server.zig b/src/mcp/Server.zig index cda8c278..e1e94a0e 100644 --- a/src/mcp/Server.zig +++ b/src/mcp/Server.zig @@ -4,7 +4,10 @@ const lp = @import("lightpanda"); const App = @import("../App.zig"); const HttpClient = @import("../http/Client.zig"); +const testing = @import("../testing.zig"); const protocol = @import("protocol.zig"); +const router = @import("router.zig"); + const Self = @This(); allocator: std.mem.Allocator, @@ -16,16 +19,15 @@ browser: lp.Browser, session: *lp.Session, page: *lp.Page, -is_running: std.atomic.Value(bool) = .init(false), -out_stream: std.fs.File, +writer: *std.io.Writer, -pub fn init(allocator: std.mem.Allocator, app: *App, out_stream: std.fs.File) !*Self { +pub fn init(allocator: std.mem.Allocator, app: *App, writer: *std.io.Writer) !*Self { const self = try allocator.create(Self); errdefer allocator.destroy(self); self.allocator = allocator; self.app = app; - self.out_stream = out_stream; + self.writer = writer; self.http_client = try app.http.createClient(allocator); errdefer self.http_client.deinit(); @@ -43,8 +45,6 @@ pub fn init(allocator: std.mem.Allocator, app: *App, out_stream: std.fs.File) !* } pub fn deinit(self: *Self) void { - self.is_running.store(false, .release); - self.browser.deinit(); self.notification.deinit(); self.http_client.deinit(); @@ -53,11 +53,11 @@ pub fn deinit(self: *Self) void { } pub fn sendResponse(self: *Self, response: anytype) !void { - var aw: std.Io.Writer.Allocating = .init(self.allocator); + var aw: std.io.Writer.Allocating = .init(self.allocator); defer aw.deinit(); try std.json.Stringify.value(response, .{ .emit_null_optional_fields = false }, &aw.writer); try aw.writer.writeByte('\n'); - try self.out_stream.writeAll(aw.written()); + try self.writer.writeAll(aw.writer.buffered()); } pub fn sendResult(self: *Self, id: std.json.Value, result: anytype) !void { @@ -82,67 +82,27 @@ pub fn sendError(self: *Self, id: std.json.Value, code: protocol.ErrorCode, mess }); } -const testing = @import("../testing.zig"); -const McpHarness = @import("testing.zig").McpHarness; +test "MCP Integration: synchronous smoke test" { + const allocator = testing.allocator; + const app = testing.test_app; -test "MCP Integration: smoke test" { - const harness = try McpHarness.init(testing.allocator, testing.test_app); - defer harness.deinit(); - - harness.thread = try std.Thread.spawn(.{}, testIntegrationSmokeInternal, .{harness}); - try harness.runServer(); -} - -fn testIntegrationSmokeInternal(harness: *McpHarness) void { - const aa = harness.allocator; - var arena = std.heap.ArenaAllocator.init(aa); - defer arena.deinit(); - const allocator = arena.allocator(); - - harness.sendRequest( + const input = \\{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test-client","version":"1.0.0"}}} - ) catch |err| { - harness.test_error = err; - return; - }; - - const response1 = harness.readResponse(allocator) catch |err| { - harness.test_error = err; - return; - }; - testing.expect(std.mem.indexOf(u8, response1, "\"id\":1") != null) catch |err| { - harness.test_error = err; - return; - }; - testing.expect(std.mem.indexOf(u8, response1, "\"tools\":{}") != null) catch |err| { - harness.test_error = err; - return; - }; - testing.expect(std.mem.indexOf(u8, response1, "\"resources\":{}") != null) catch |err| { - harness.test_error = err; - return; - }; - - harness.sendRequest( \\{"jsonrpc":"2.0","id":2,"method":"tools/list"} - ) catch |err| { - harness.test_error = err; - return; - }; + ; - const response2 = harness.readResponse(allocator) catch |err| { - harness.test_error = err; - return; - }; - testing.expect(std.mem.indexOf(u8, response2, "\"id\":2") != null) catch |err| { - harness.test_error = err; - return; - }; - testing.expect(std.mem.indexOf(u8, response2, "\"name\":\"goto\"") != null) catch |err| { - harness.test_error = err; - return; - }; + var in_reader: std.io.Reader = .fixed(input); + var out_alloc: std.io.Writer.Allocating = .init(allocator); + defer out_alloc.deinit(); - harness.server.is_running.store(false, .release); - _ = harness.client_out.writeAll("\n") catch {}; + var server: *Self = try .init(allocator, app, &out_alloc.writer); + defer server.deinit(); + + try router.processRequests(server, &in_reader); + + const output = out_alloc.writer.buffered(); + try testing.expect(std.mem.indexOf(u8, output, "\"id\":1") != null); + try testing.expect(std.mem.indexOf(u8, output, "\"tools\":{}") != null); + try testing.expect(std.mem.indexOf(u8, output, "\"id\":2") != null); + try testing.expect(std.mem.indexOf(u8, output, "\"name\":\"goto\"") != null); } diff --git a/src/mcp/router.zig b/src/mcp/router.zig index 417ca913..7b21a37b 100644 --- a/src/mcp/router.zig +++ b/src/mcp/router.zig @@ -5,40 +5,27 @@ const resources = @import("resources.zig"); const Server = @import("Server.zig"); const tools = @import("tools.zig"); -pub fn processRequests(server: *Server, in_stream: std.fs.File) !void { - server.is_running.store(true, .release); +pub fn processRequests(server: *Server, reader: *std.io.Reader) !void { + var arena: std.heap.ArenaAllocator = .init(server.allocator); + defer arena.deinit(); - const Streams = enum { stdin }; - var poller = std.io.poll(server.allocator, Streams, .{ .stdin = in_stream }); - defer poller.deinit(); + while (true) { + _ = arena.reset(.retain_capacity); + const aa = arena.allocator(); - const r = poller.reader(.stdin); + const buffered_line = reader.takeDelimiter('\n') catch |err| switch (err) { + error.StreamTooLong => { + log.err(.mcp, "Message too long", .{}); + continue; + }, + else => return err, + } orelse break; - while (server.is_running.load(.acquire)) { - const poll_result = try poller.pollTimeout(100 * std.time.ns_per_ms); - - if (!poll_result) { - // EOF or all streams closed - server.is_running.store(false, .release); - break; - } - - while (true) { - const buffered = r.buffered(); - const newline_idx = std.mem.indexOfScalar(u8, buffered, '\n') orelse break; - const line = buffered[0 .. newline_idx + 1]; - - const trimmed = std.mem.trim(u8, line, " \r\n\t"); - if (trimmed.len > 0) { - var arena = std.heap.ArenaAllocator.init(server.allocator); - defer arena.deinit(); - - handleMessage(server, arena.allocator(), trimmed) catch |err| { - log.err(.mcp, "Failed to handle message", .{ .err = err, .msg = trimmed }); - }; - } - - r.toss(line.len); + const trimmed = std.mem.trim(u8, buffered_line, " \r\t"); + if (trimmed.len > 0) { + handleMessage(server, aa, trimmed) catch |err| { + log.err(.mcp, "Failed to handle message", .{ .err = err, .msg = trimmed }); + }; } } } @@ -106,35 +93,36 @@ fn handleInitialize(server: *Server, req: protocol.Request) !void { } const testing = @import("../testing.zig"); -const McpHarness = @import("testing.zig").McpHarness; test "handleMessage - synchronous unit tests" { - // We need a server, but we want it to write to our fbs - // Server.init currently takes std.fs.File, we might need to refactor it - // to take a generic writer if we want to be truly "cranky" and avoid OS files. - // For now, let's use the harness as it's already set up, but call handleMessage directly. - const harness = try McpHarness.init(testing.allocator, testing.test_app); - defer harness.deinit(); + const allocator = testing.allocator; + const app = testing.test_app; - var arena = std.heap.ArenaAllocator.init(testing.allocator); + var out_alloc = std.io.Writer.Allocating.init(allocator); + defer out_alloc.deinit(); + + var server = try Server.init(allocator, app, &out_alloc.writer); + defer server.deinit(); + + var arena = std.heap.ArenaAllocator.init(allocator); defer arena.deinit(); const aa = arena.allocator(); // 1. Valid request - try handleMessage(harness.server, aa, + try handleMessage(server, aa, \\{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}} ); - const resp1 = try harness.readResponse(aa); - try testing.expect(std.mem.indexOf(u8, resp1, "\"id\":1") != null); - try testing.expect(std.mem.indexOf(u8, resp1, "\"name\":\"lightpanda\"") != null); + try testing.expect(std.mem.indexOf(u8, out_alloc.writer.buffered(), "\"id\":1") != null); + try testing.expect(std.mem.indexOf(u8, out_alloc.writer.buffered(), "\"name\":\"lightpanda\"") != null); + out_alloc.writer.end = 0; // 2. Method not found - try handleMessage(harness.server, aa, + try handleMessage(server, aa, \\{"jsonrpc":"2.0","id":2,"method":"unknown_method"} ); - const resp2 = try harness.readResponse(aa); - try testing.expect(std.mem.indexOf(u8, resp2, "\"id\":2") != null); - try testing.expect(std.mem.indexOf(u8, resp2, "\"code\":-32601") != null); + try testing.expect(std.mem.indexOf(u8, out_alloc.writer.buffered(), "\"id\":2") != null); + try testing.expect(std.mem.indexOf(u8, out_alloc.writer.buffered(), "\"code\":-32601") != null); + out_alloc.writer.end = 0; // 3. Parse error { @@ -142,9 +130,8 @@ test "handleMessage - synchronous unit tests" { log.opts.filter_scopes = &.{.mcp}; defer log.opts.filter_scopes = old_filter; - try handleMessage(harness.server, aa, "invalid json"); - const resp3 = try harness.readResponse(aa); - try testing.expect(std.mem.indexOf(u8, resp3, "\"id\":null") != null); - try testing.expect(std.mem.indexOf(u8, resp3, "\"code\":-32700") != null); + try handleMessage(server, aa, "invalid json"); + try testing.expect(std.mem.indexOf(u8, out_alloc.writer.buffered(), "\"id\":null") != null); + try testing.expect(std.mem.indexOf(u8, out_alloc.writer.buffered(), "\"code\":-32700") != null); } } diff --git a/src/mcp/testing.zig b/src/mcp/testing.zig deleted file mode 100644 index b7ecb568..00000000 --- a/src/mcp/testing.zig +++ /dev/null @@ -1,130 +0,0 @@ -const std = @import("std"); -const lp = @import("lightpanda"); -const App = @import("../App.zig"); -const Server = @import("Server.zig"); -const router = @import("router.zig"); - -pub const McpHarness = struct { - allocator: std.mem.Allocator, - app: *App, - server: *Server, - - // Client view of the communication - client_in: std.fs.File, // Client reads from this (server's stdout) - client_out: std.fs.File, // Client writes to this (server's stdin) - - // Server view of the communication - server_in: std.fs.File, // Server reads from this (client's stdout) - server_out: std.fs.File, // Server writes to this (client's stdin) - - thread: ?std.Thread = null, - test_error: ?anyerror = null, - - const Pipe = struct { - read: std.fs.File, - write: std.fs.File, - - fn init() !Pipe { - const fds = try std.posix.pipe(); - return .{ - .read = .{ .handle = fds[0] }, - .write = .{ .handle = fds[1] }, - }; - } - - fn close(self: Pipe) void { - self.read.close(); - self.write.close(); - } - }; - - pub fn init(allocator: std.mem.Allocator, app: *App) !*McpHarness { - const self = try allocator.create(McpHarness); - errdefer allocator.destroy(self); - - self.allocator = allocator; - self.app = app; - self.thread = null; - self.test_error = null; - - const stdin_pipe = try Pipe.init(); - errdefer stdin_pipe.close(); - - const stdout_pipe = try Pipe.init(); - errdefer { - stdin_pipe.close(); - stdout_pipe.close(); - } - - self.server_in = stdin_pipe.read; - self.client_out = stdin_pipe.write; - self.client_in = stdout_pipe.read; - self.server_out = stdout_pipe.write; - - self.server = try Server.init(allocator, app, self.server_out); - errdefer self.server.deinit(); - - return self; - } - - pub fn deinit(self: *McpHarness) void { - self.server.is_running.store(false, .release); - - // Wake up the server's poll loop by writing a newline - self.client_out.writeAll("\n") catch {}; - - // Closing the client's output will also send EOF to the server - self.client_out.close(); - - if (self.thread) |t| t.join(); - - self.server.deinit(); - - // Server handles are closed here if they weren't already - self.server_in.close(); - self.server_out.close(); - self.client_in.close(); - // self.client_out is already closed above - - self.allocator.destroy(self); - } - - pub fn runServer(self: *McpHarness) !void { - try router.processRequests(self.server, self.server_in); - if (self.test_error) |err| return err; - } - - pub fn sendRequest(self: *McpHarness, request_json: []const u8) !void { - try self.client_out.writeAll(request_json); - if (request_json.len > 0 and request_json[request_json.len - 1] != '\n') { - try self.client_out.writeAll("\n"); - } - } - - pub fn readResponse(self: *McpHarness, arena: std.mem.Allocator) ![]const u8 { - const Streams = enum { stdout }; - var poller = std.io.poll(self.allocator, Streams, .{ .stdout = self.client_in }); - defer poller.deinit(); - - const r = poller.reader(.stdout); - - const timeout_ns = 2 * std.time.ns_per_s; - var timer = try std.time.Timer.start(); - - while (timer.read() < timeout_ns) { - const poll_result = try poller.pollTimeout(timeout_ns - timer.read()); - - if (!poll_result) return error.EndOfStream; - - const buffered = r.buffered(); - if (std.mem.indexOfScalar(u8, buffered, '\n')) |newline_idx| { - const line = buffered[0 .. newline_idx + 1]; - const result = try arena.dupe(u8, std.mem.trim(u8, line, " \r\n\t")); - r.toss(line.len); - return result; - } - } - - return error.Timeout; - } -};