mcp: optimize dispatching and simplify test harness

- Use StaticStringMap and enums for method, tool, and resource lookups.
- Implement comptime JSON minification for tool schemas.
- Refactor router and harness to use more efficient buffered polling.
- Consolidate integration tests and add synchronous unit tests.
This commit is contained in:
Adrià Arrufat
2026-03-02 20:53:14 +09:00
parent a7872aa054
commit 73565c4493
6 changed files with 357 additions and 350 deletions

View File

@@ -12,48 +12,59 @@ pub fn processRequests(server: *Server, in_stream: std.fs.File) !void {
var poller = std.io.poll(server.allocator, Streams, .{ .stdin = in_stream });
defer poller.deinit();
var buffer = std.ArrayListUnmanaged(u8).empty;
defer buffer.deinit(server.allocator);
const r = poller.reader(.stdin);
while (server.is_running.load(.acquire)) {
const poll_result = try poller.pollTimeout(100 * std.time.ns_per_ms);
if (poll_result) {
const data = try poller.toOwnedSlice(.stdin);
if (data.len == 0) {
server.is_running.store(false, .release);
break;
}
try buffer.appendSlice(server.allocator, data);
server.allocator.free(data);
if (!poll_result) {
// EOF or all streams closed
server.is_running.store(false, .release);
break;
}
while (std.mem.indexOfScalar(u8, buffer.items, '\n')) |newline_idx| {
const line = try server.allocator.dupe(u8, buffer.items[0..newline_idx]);
defer server.allocator.free(line);
while (true) {
const buffered = r.buffered();
const newline_idx = std.mem.indexOfScalar(u8, buffered, '\n') orelse break;
const line = buffered[0 .. newline_idx + 1];
const remaining = buffer.items.len - (newline_idx + 1);
std.mem.copyForwards(u8, buffer.items[0..remaining], buffer.items[newline_idx + 1 ..]);
buffer.items.len = remaining;
const trimmed = std.mem.trim(u8, line, " \r\n\t");
if (trimmed.len > 0) {
var arena = std.heap.ArenaAllocator.init(server.allocator);
defer arena.deinit();
// Ignore empty lines (e.g. from deinit unblock)
const trimmed = std.mem.trim(u8, line, " \r\t");
if (trimmed.len == 0) continue;
handleMessage(server, arena.allocator(), trimmed) catch |err| {
log.err(.mcp, "Failed to handle message", .{ .err = err, .msg = trimmed });
};
}
var arena = std.heap.ArenaAllocator.init(server.allocator);
defer arena.deinit();
handleMessage(server, arena.allocator(), trimmed) catch |err| {
log.err(.mcp, "Failed to handle message", .{ .err = err, .msg = trimmed });
};
r.toss(line.len);
}
}
}
const log = @import("../log.zig");
fn handleMessage(server: *Server, arena: std.mem.Allocator, msg: []const u8) !void {
const req = std.json.parseFromSlice(protocol.Request, arena, msg, .{
const Method = enum {
initialize,
@"notifications/initialized",
@"tools/list",
@"tools/call",
@"resources/list",
@"resources/read",
};
const method_map = std.StaticStringMap(Method).initComptime(.{
.{ "initialize", .initialize },
.{ "notifications/initialized", .@"notifications/initialized" },
.{ "tools/list", .@"tools/list" },
.{ "tools/call", .@"tools/call" },
.{ "resources/list", .@"resources/list" },
.{ "resources/read", .@"resources/read" },
});
pub fn handleMessage(server: *Server, arena: std.mem.Allocator, msg: []const u8) !void {
const req = std.json.parseFromSliceLeaky(protocol.Request, arena, msg, .{
.ignore_unknown_fields = true,
}) catch |err| {
log.warn(.mcp, "JSON Parse Error", .{ .err = err, .msg = msg });
@@ -61,40 +72,30 @@ fn handleMessage(server: *Server, arena: std.mem.Allocator, msg: []const u8) !vo
return;
};
if (std.mem.eql(u8, req.value.method, "initialize")) {
return handleInitialize(server, req.value);
}
if (std.mem.eql(u8, req.value.method, "notifications/initialized")) {
// nothing to do
const method = method_map.get(req.method) orelse {
if (req.id != null) {
try server.sendError(req.id.?, .MethodNotFound, "Method not found");
}
return;
}
};
if (std.mem.eql(u8, req.value.method, "tools/list")) {
return tools.handleList(server, arena, req.value);
}
if (std.mem.eql(u8, req.value.method, "tools/call")) {
return tools.handleCall(server, arena, req.value);
}
if (std.mem.eql(u8, req.value.method, "resources/list")) {
return resources.handleList(server, req.value);
}
if (std.mem.eql(u8, req.value.method, "resources/read")) {
return resources.handleRead(server, arena, req.value);
}
if (req.value.id != null) {
return server.sendError(req.value.id.?, .MethodNotFound, "Method not found");
switch (method) {
.initialize => try handleInitialize(server, req),
.@"notifications/initialized" => {},
.@"tools/list" => try tools.handleList(server, arena, req),
.@"tools/call" => try tools.handleCall(server, arena, req),
.@"resources/list" => try resources.handleList(server, req),
.@"resources/read" => try resources.handleRead(server, arena, req),
}
}
fn handleInitialize(server: *Server, req: protocol.Request) !void {
const result = protocol.InitializeResult{
.protocolVersion = "2025-11-25",
.capabilities = .{},
.capabilities = .{
.resources = .{},
.tools = .{},
},
.serverInfo = .{
.name = "lightpanda",
.version = "0.1.0",
@@ -107,33 +108,43 @@ fn handleInitialize(server: *Server, req: protocol.Request) !void {
const testing = @import("../testing.zig");
const McpHarness = @import("testing.zig").McpHarness;
test "handleMessage - ParseError" {
test "handleMessage - synchronous unit tests" {
// We need a server, but we want it to write to our fbs
// Server.init currently takes std.fs.File, we might need to refactor it
// to take a generic writer if we want to be truly "cranky" and avoid OS files.
// For now, let's use the harness as it's already set up, but call handleMessage directly.
const harness = try McpHarness.init(testing.allocator, testing.test_app);
defer harness.deinit();
harness.thread = try std.Thread.spawn(.{}, wrapTest, .{ testParseErrorInternal, harness });
try harness.runServer();
}
fn wrapTest(comptime func: fn (*McpHarness) anyerror!void, harness: *McpHarness) void {
const res = func(harness);
if (res) |_| {
harness.test_error = null;
} else |err| {
harness.test_error = err;
}
harness.server.is_running.store(false, .release);
// Ensure we trigger a poll wake up if needed
_ = harness.client_out.writeAll("\n") catch {};
}
fn testParseErrorInternal(harness: *McpHarness) !void {
var arena = std.heap.ArenaAllocator.init(harness.allocator);
var arena = std.heap.ArenaAllocator.init(testing.allocator);
defer arena.deinit();
const aa = arena.allocator();
try harness.sendRequest("invalid json");
// 1. Valid request
try handleMessage(harness.server, aa,
\\{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}
);
const resp1 = try harness.readResponse(aa);
try testing.expect(std.mem.indexOf(u8, resp1, "\"id\":1") != null);
try testing.expect(std.mem.indexOf(u8, resp1, "\"name\":\"lightpanda\"") != null);
const response = try harness.readResponse(arena.allocator());
try testing.expect(std.mem.indexOf(u8, response, "\"id\":null") != null);
try testing.expect(std.mem.indexOf(u8, response, "\"code\":-32700") != null);
// 2. Method not found
try handleMessage(harness.server, aa,
\\{"jsonrpc":"2.0","id":2,"method":"unknown_method"}
);
const resp2 = try harness.readResponse(aa);
try testing.expect(std.mem.indexOf(u8, resp2, "\"id\":2") != null);
try testing.expect(std.mem.indexOf(u8, resp2, "\"code\":-32601") != null);
// 3. Parse error
{
const old_filter = log.opts.filter_scopes;
log.opts.filter_scopes = &.{.mcp};
defer log.opts.filter_scopes = old_filter;
try handleMessage(harness.server, aa, "invalid json");
const resp3 = try harness.readResponse(aa);
try testing.expect(std.mem.indexOf(u8, resp3, "\"id\":null") != null);
try testing.expect(std.mem.indexOf(u8, resp3, "\"code\":-32700") != null);
}
}