mcp: improve robustness of server and test harness

- Refactor router and test harness for non-blocking I/O using buffered polling.
- Implement reliable test failure reporting from sub-threads to the main test runner.
- Encapsulate pipe management using idiomatic std.fs.File methods.
- Fix invalid JSON generation in resource streaming due to duplicate fields.
- Improve shutdown sequence for clean test exits.
This commit is contained in:
Adrià Arrufat
2026-03-02 16:50:47 +09:00
parent 64107f5957
commit a7872aa054
4 changed files with 203 additions and 174 deletions

View File

@@ -1,8 +1,5 @@
const std = @import("std");
const lp = @import("lightpanda");
const log = lp.log;
const protocol = @import("protocol.zig");
const resources = @import("resources.zig");
const Server = @import("Server.zig");
@@ -15,55 +12,48 @@ pub fn processRequests(server: *Server, in_stream: std.fs.File) !void {
var poller = std.io.poll(server.allocator, Streams, .{ .stdin = in_stream });
defer poller.deinit();
const reader = poller.reader(.stdin);
var arena_instance = std.heap.ArenaAllocator.init(server.allocator);
defer arena_instance.deinit();
const arena = arena_instance.allocator();
var buffer = std.ArrayListUnmanaged(u8).empty;
defer buffer.deinit(server.allocator);
while (server.is_running.load(.acquire)) {
// Run ready browser tasks and get time to next one
const ms_to_next_task = (try server.browser.runMacrotasks()) orelse 10_000;
const poll_result = try poller.pollTimeout(100 * std.time.ns_per_ms);
// Keep the loop responsive to network events and stdin.
const ms_to_wait: u64 = @min(50, ms_to_next_task);
// Wait for stdin activity for up to ms_to_wait.
const poll_result = try poller.pollTimeout(ms_to_wait * @as(u64, std.time.ns_per_ms));
// Process any pending network I/O
_ = try server.http_client.tick(0);
// Process all complete lines available in the buffer
while (true) {
const buffered = reader.buffered();
if (std.mem.indexOfScalar(u8, buffered, '\n')) |idx| {
const line = buffered[0..idx];
if (line.len > 0) {
handleMessage(server, arena, line) catch |err| {
log.warn(.mcp, "Error processing message", .{ .err = err });
};
_ = arena_instance.reset(.{ .retain_with_limit = 32 * 1024 });
}
reader.toss(idx + 1);
} else {
if (poll_result) {
const data = try poller.toOwnedSlice(.stdin);
if (data.len == 0) {
server.is_running.store(false, .release);
break;
}
try buffer.appendSlice(server.allocator, data);
server.allocator.free(data);
}
// pollTimeout returns false when all streams are closed (EOF on stdin)
if (!poll_result) {
const buffered = reader.buffered();
if (buffered.len > 0) {
handleMessage(server, arena, buffered) catch {};
}
break;
while (std.mem.indexOfScalar(u8, buffer.items, '\n')) |newline_idx| {
const line = try server.allocator.dupe(u8, buffer.items[0..newline_idx]);
defer server.allocator.free(line);
const remaining = buffer.items.len - (newline_idx + 1);
std.mem.copyForwards(u8, buffer.items[0..remaining], buffer.items[newline_idx + 1 ..]);
buffer.items.len = remaining;
// Ignore empty lines (e.g. from deinit unblock)
const trimmed = std.mem.trim(u8, line, " \r\t");
if (trimmed.len == 0) continue;
var arena = std.heap.ArenaAllocator.init(server.allocator);
defer arena.deinit();
handleMessage(server, arena.allocator(), trimmed) catch |err| {
log.err(.mcp, "Failed to handle message", .{ .err = err, .msg = trimmed });
};
}
}
}
const log = @import("../log.zig");
fn handleMessage(server: *Server, arena: std.mem.Allocator, msg: []const u8) !void {
const parsed = std.json.parseFromSliceLeaky(protocol.Request, arena, msg, .{
const req = std.json.parseFromSlice(protocol.Request, arena, msg, .{
.ignore_unknown_fields = true,
}) catch |err| {
log.warn(.mcp, "JSON Parse Error", .{ .err = err, .msg = msg });
@@ -71,40 +61,42 @@ fn handleMessage(server: *Server, arena: std.mem.Allocator, msg: []const u8) !vo
return;
};
if (parsed.id == null) {
// It's a notification
if (std.mem.eql(u8, parsed.method, "notifications/initialized")) {
log.info(.mcp, "Client Initialized", .{});
}
if (std.mem.eql(u8, req.value.method, "initialize")) {
return handleInitialize(server, req.value);
}
if (std.mem.eql(u8, req.value.method, "notifications/initialized")) {
// nothing to do
return;
}
if (std.mem.eql(u8, parsed.method, "initialize")) {
try handleInitialize(server, parsed);
} else if (std.mem.eql(u8, parsed.method, "resources/list")) {
try resources.handleList(server, parsed);
} else if (std.mem.eql(u8, parsed.method, "resources/read")) {
try resources.handleRead(server, arena, parsed);
} else if (std.mem.eql(u8, parsed.method, "tools/list")) {
try tools.handleList(server, arena, parsed);
} else if (std.mem.eql(u8, parsed.method, "tools/call")) {
try tools.handleCall(server, arena, parsed);
} else {
try server.sendError(parsed.id.?, .MethodNotFound, "Method not found");
if (std.mem.eql(u8, req.value.method, "tools/list")) {
return tools.handleList(server, arena, req.value);
}
if (std.mem.eql(u8, req.value.method, "tools/call")) {
return tools.handleCall(server, arena, req.value);
}
if (std.mem.eql(u8, req.value.method, "resources/list")) {
return resources.handleList(server, req.value);
}
if (std.mem.eql(u8, req.value.method, "resources/read")) {
return resources.handleRead(server, arena, req.value);
}
if (req.value.id != null) {
return server.sendError(req.value.id.?, .MethodNotFound, "Method not found");
}
}
fn handleInitialize(server: *Server, req: protocol.Request) !void {
const result = protocol.InitializeResult{
.protocolVersion = "2025-11-25",
.capabilities = .{
.logging = .{},
.prompts = .{ .listChanged = false },
.resources = .{ .subscribe = false, .listChanged = false },
.tools = .{ .listChanged = false },
},
.capabilities = .{},
.serverInfo = .{
.name = "lightpanda-mcp",
.name = "lightpanda",
.version = "0.1.0",
},
};
@@ -119,19 +111,29 @@ test "handleMessage - ParseError" {
const harness = try McpHarness.init(testing.allocator, testing.test_app);
defer harness.deinit();
harness.thread = try std.Thread.spawn(.{}, testParseError, .{harness});
harness.thread = try std.Thread.spawn(.{}, wrapTest, .{ testParseErrorInternal, harness });
try harness.runServer();
}
fn testParseError(harness: *McpHarness) void {
defer harness.server.is_running.store(false, .release);
fn wrapTest(comptime func: fn (*McpHarness) anyerror!void, harness: *McpHarness) void {
const res = func(harness);
if (res) |_| {
harness.test_error = null;
} else |err| {
harness.test_error = err;
}
harness.server.is_running.store(false, .release);
// Ensure we trigger a poll wake up if needed
_ = harness.client_out.writeAll("\n") catch {};
}
fn testParseErrorInternal(harness: *McpHarness) !void {
var arena = std.heap.ArenaAllocator.init(harness.allocator);
defer arena.deinit();
harness.sendRequest("invalid json") catch return;
try harness.sendRequest("invalid json");
const response = harness.readResponse(arena.allocator()) catch return;
testing.expect(std.mem.indexOf(u8, response, "\"id\":null") != null) catch return;
testing.expect(std.mem.indexOf(u8, response, "\"code\":-32700") != null) catch return;
const response = try harness.readResponse(arena.allocator());
try testing.expect(std.mem.indexOf(u8, response, "\"id\":null") != null);
try testing.expect(std.mem.indexOf(u8, response, "\"code\":-32700") != null);
}