mirror of
https://github.com/lightpanda-io/browser.git
synced 2026-03-22 04:34:44 +00:00
mcp: simplify I/O architecture and remove test harness
This commit is contained in:
10
src/main.zig
10
src/main.zig
@@ -136,10 +136,16 @@ fn run(allocator: Allocator, main_arena: Allocator) !void {
|
||||
|
||||
log.opts.format = .logfmt;
|
||||
|
||||
var mcp_server = try lp.mcp.Server.init(allocator, app, std.fs.File.stdout());
|
||||
var stdout_buf: [4096]u8 = undefined;
|
||||
var stdout = std.fs.File.stdout().writer(&stdout_buf);
|
||||
|
||||
var mcp_server = try lp.mcp.Server.init(allocator, app, &stdout.interface);
|
||||
defer mcp_server.deinit();
|
||||
|
||||
try lp.mcp.router.processRequests(mcp_server, std.fs.File.stdin());
|
||||
var stdin_buf: [4096]u8 = undefined;
|
||||
var stdin = std.fs.File.stdin().reader(&stdin_buf);
|
||||
|
||||
try lp.mcp.router.processRequests(mcp_server, &stdin.interface);
|
||||
},
|
||||
else => unreachable,
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
pub const Server = @import("mcp/Server.zig");
|
||||
pub const protocol = @import("mcp/protocol.zig");
|
||||
pub const router = @import("mcp/router.zig");
|
||||
pub const testing = @import("mcp/testing.zig");
|
||||
|
||||
@@ -4,7 +4,10 @@ const lp = @import("lightpanda");
|
||||
|
||||
const App = @import("../App.zig");
|
||||
const HttpClient = @import("../http/Client.zig");
|
||||
const testing = @import("../testing.zig");
|
||||
const protocol = @import("protocol.zig");
|
||||
const router = @import("router.zig");
|
||||
|
||||
const Self = @This();
|
||||
|
||||
allocator: std.mem.Allocator,
|
||||
@@ -16,16 +19,15 @@ browser: lp.Browser,
|
||||
session: *lp.Session,
|
||||
page: *lp.Page,
|
||||
|
||||
is_running: std.atomic.Value(bool) = .init(false),
|
||||
out_stream: std.fs.File,
|
||||
writer: *std.io.Writer,
|
||||
|
||||
pub fn init(allocator: std.mem.Allocator, app: *App, out_stream: std.fs.File) !*Self {
|
||||
pub fn init(allocator: std.mem.Allocator, app: *App, writer: *std.io.Writer) !*Self {
|
||||
const self = try allocator.create(Self);
|
||||
errdefer allocator.destroy(self);
|
||||
|
||||
self.allocator = allocator;
|
||||
self.app = app;
|
||||
self.out_stream = out_stream;
|
||||
self.writer = writer;
|
||||
|
||||
self.http_client = try app.http.createClient(allocator);
|
||||
errdefer self.http_client.deinit();
|
||||
@@ -43,8 +45,6 @@ pub fn init(allocator: std.mem.Allocator, app: *App, out_stream: std.fs.File) !*
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Self) void {
|
||||
self.is_running.store(false, .release);
|
||||
|
||||
self.browser.deinit();
|
||||
self.notification.deinit();
|
||||
self.http_client.deinit();
|
||||
@@ -53,11 +53,11 @@ pub fn deinit(self: *Self) void {
|
||||
}
|
||||
|
||||
pub fn sendResponse(self: *Self, response: anytype) !void {
|
||||
var aw: std.Io.Writer.Allocating = .init(self.allocator);
|
||||
var aw: std.io.Writer.Allocating = .init(self.allocator);
|
||||
defer aw.deinit();
|
||||
try std.json.Stringify.value(response, .{ .emit_null_optional_fields = false }, &aw.writer);
|
||||
try aw.writer.writeByte('\n');
|
||||
try self.out_stream.writeAll(aw.written());
|
||||
try self.writer.writeAll(aw.writer.buffered());
|
||||
}
|
||||
|
||||
pub fn sendResult(self: *Self, id: std.json.Value, result: anytype) !void {
|
||||
@@ -82,67 +82,27 @@ pub fn sendError(self: *Self, id: std.json.Value, code: protocol.ErrorCode, mess
|
||||
});
|
||||
}
|
||||
|
||||
const testing = @import("../testing.zig");
|
||||
const McpHarness = @import("testing.zig").McpHarness;
|
||||
test "MCP Integration: synchronous smoke test" {
|
||||
const allocator = testing.allocator;
|
||||
const app = testing.test_app;
|
||||
|
||||
test "MCP Integration: smoke test" {
|
||||
const harness = try McpHarness.init(testing.allocator, testing.test_app);
|
||||
defer harness.deinit();
|
||||
|
||||
harness.thread = try std.Thread.spawn(.{}, testIntegrationSmokeInternal, .{harness});
|
||||
try harness.runServer();
|
||||
}
|
||||
|
||||
fn testIntegrationSmokeInternal(harness: *McpHarness) void {
|
||||
const aa = harness.allocator;
|
||||
var arena = std.heap.ArenaAllocator.init(aa);
|
||||
defer arena.deinit();
|
||||
const allocator = arena.allocator();
|
||||
|
||||
harness.sendRequest(
|
||||
const input =
|
||||
\\{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test-client","version":"1.0.0"}}}
|
||||
) catch |err| {
|
||||
harness.test_error = err;
|
||||
return;
|
||||
};
|
||||
|
||||
const response1 = harness.readResponse(allocator) catch |err| {
|
||||
harness.test_error = err;
|
||||
return;
|
||||
};
|
||||
testing.expect(std.mem.indexOf(u8, response1, "\"id\":1") != null) catch |err| {
|
||||
harness.test_error = err;
|
||||
return;
|
||||
};
|
||||
testing.expect(std.mem.indexOf(u8, response1, "\"tools\":{}") != null) catch |err| {
|
||||
harness.test_error = err;
|
||||
return;
|
||||
};
|
||||
testing.expect(std.mem.indexOf(u8, response1, "\"resources\":{}") != null) catch |err| {
|
||||
harness.test_error = err;
|
||||
return;
|
||||
};
|
||||
|
||||
harness.sendRequest(
|
||||
\\{"jsonrpc":"2.0","id":2,"method":"tools/list"}
|
||||
) catch |err| {
|
||||
harness.test_error = err;
|
||||
return;
|
||||
};
|
||||
;
|
||||
|
||||
const response2 = harness.readResponse(allocator) catch |err| {
|
||||
harness.test_error = err;
|
||||
return;
|
||||
};
|
||||
testing.expect(std.mem.indexOf(u8, response2, "\"id\":2") != null) catch |err| {
|
||||
harness.test_error = err;
|
||||
return;
|
||||
};
|
||||
testing.expect(std.mem.indexOf(u8, response2, "\"name\":\"goto\"") != null) catch |err| {
|
||||
harness.test_error = err;
|
||||
return;
|
||||
};
|
||||
var in_reader: std.io.Reader = .fixed(input);
|
||||
var out_alloc: std.io.Writer.Allocating = .init(allocator);
|
||||
defer out_alloc.deinit();
|
||||
|
||||
harness.server.is_running.store(false, .release);
|
||||
_ = harness.client_out.writeAll("\n") catch {};
|
||||
var server: *Self = try .init(allocator, app, &out_alloc.writer);
|
||||
defer server.deinit();
|
||||
|
||||
try router.processRequests(server, &in_reader);
|
||||
|
||||
const output = out_alloc.writer.buffered();
|
||||
try testing.expect(std.mem.indexOf(u8, output, "\"id\":1") != null);
|
||||
try testing.expect(std.mem.indexOf(u8, output, "\"tools\":{}") != null);
|
||||
try testing.expect(std.mem.indexOf(u8, output, "\"id\":2") != null);
|
||||
try testing.expect(std.mem.indexOf(u8, output, "\"name\":\"goto\"") != null);
|
||||
}
|
||||
|
||||
@@ -5,40 +5,27 @@ const resources = @import("resources.zig");
|
||||
const Server = @import("Server.zig");
|
||||
const tools = @import("tools.zig");
|
||||
|
||||
pub fn processRequests(server: *Server, in_stream: std.fs.File) !void {
|
||||
server.is_running.store(true, .release);
|
||||
pub fn processRequests(server: *Server, reader: *std.io.Reader) !void {
|
||||
var arena: std.heap.ArenaAllocator = .init(server.allocator);
|
||||
defer arena.deinit();
|
||||
|
||||
const Streams = enum { stdin };
|
||||
var poller = std.io.poll(server.allocator, Streams, .{ .stdin = in_stream });
|
||||
defer poller.deinit();
|
||||
while (true) {
|
||||
_ = arena.reset(.retain_capacity);
|
||||
const aa = arena.allocator();
|
||||
|
||||
const r = poller.reader(.stdin);
|
||||
const buffered_line = reader.takeDelimiter('\n') catch |err| switch (err) {
|
||||
error.StreamTooLong => {
|
||||
log.err(.mcp, "Message too long", .{});
|
||||
continue;
|
||||
},
|
||||
else => return err,
|
||||
} orelse break;
|
||||
|
||||
while (server.is_running.load(.acquire)) {
|
||||
const poll_result = try poller.pollTimeout(100 * std.time.ns_per_ms);
|
||||
|
||||
if (!poll_result) {
|
||||
// EOF or all streams closed
|
||||
server.is_running.store(false, .release);
|
||||
break;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
const buffered = r.buffered();
|
||||
const newline_idx = std.mem.indexOfScalar(u8, buffered, '\n') orelse break;
|
||||
const line = buffered[0 .. newline_idx + 1];
|
||||
|
||||
const trimmed = std.mem.trim(u8, line, " \r\n\t");
|
||||
if (trimmed.len > 0) {
|
||||
var arena = std.heap.ArenaAllocator.init(server.allocator);
|
||||
defer arena.deinit();
|
||||
|
||||
handleMessage(server, arena.allocator(), trimmed) catch |err| {
|
||||
log.err(.mcp, "Failed to handle message", .{ .err = err, .msg = trimmed });
|
||||
};
|
||||
}
|
||||
|
||||
r.toss(line.len);
|
||||
const trimmed = std.mem.trim(u8, buffered_line, " \r\t");
|
||||
if (trimmed.len > 0) {
|
||||
handleMessage(server, aa, trimmed) catch |err| {
|
||||
log.err(.mcp, "Failed to handle message", .{ .err = err, .msg = trimmed });
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -106,35 +93,36 @@ fn handleInitialize(server: *Server, req: protocol.Request) !void {
|
||||
}
|
||||
|
||||
const testing = @import("../testing.zig");
|
||||
const McpHarness = @import("testing.zig").McpHarness;
|
||||
|
||||
test "handleMessage - synchronous unit tests" {
|
||||
// We need a server, but we want it to write to our fbs
|
||||
// Server.init currently takes std.fs.File, we might need to refactor it
|
||||
// to take a generic writer if we want to be truly "cranky" and avoid OS files.
|
||||
// For now, let's use the harness as it's already set up, but call handleMessage directly.
|
||||
const harness = try McpHarness.init(testing.allocator, testing.test_app);
|
||||
defer harness.deinit();
|
||||
const allocator = testing.allocator;
|
||||
const app = testing.test_app;
|
||||
|
||||
var arena = std.heap.ArenaAllocator.init(testing.allocator);
|
||||
var out_alloc = std.io.Writer.Allocating.init(allocator);
|
||||
defer out_alloc.deinit();
|
||||
|
||||
var server = try Server.init(allocator, app, &out_alloc.writer);
|
||||
defer server.deinit();
|
||||
|
||||
var arena = std.heap.ArenaAllocator.init(allocator);
|
||||
defer arena.deinit();
|
||||
const aa = arena.allocator();
|
||||
|
||||
// 1. Valid request
|
||||
try handleMessage(harness.server, aa,
|
||||
try handleMessage(server, aa,
|
||||
\\{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}
|
||||
);
|
||||
const resp1 = try harness.readResponse(aa);
|
||||
try testing.expect(std.mem.indexOf(u8, resp1, "\"id\":1") != null);
|
||||
try testing.expect(std.mem.indexOf(u8, resp1, "\"name\":\"lightpanda\"") != null);
|
||||
try testing.expect(std.mem.indexOf(u8, out_alloc.writer.buffered(), "\"id\":1") != null);
|
||||
try testing.expect(std.mem.indexOf(u8, out_alloc.writer.buffered(), "\"name\":\"lightpanda\"") != null);
|
||||
out_alloc.writer.end = 0;
|
||||
|
||||
// 2. Method not found
|
||||
try handleMessage(harness.server, aa,
|
||||
try handleMessage(server, aa,
|
||||
\\{"jsonrpc":"2.0","id":2,"method":"unknown_method"}
|
||||
);
|
||||
const resp2 = try harness.readResponse(aa);
|
||||
try testing.expect(std.mem.indexOf(u8, resp2, "\"id\":2") != null);
|
||||
try testing.expect(std.mem.indexOf(u8, resp2, "\"code\":-32601") != null);
|
||||
try testing.expect(std.mem.indexOf(u8, out_alloc.writer.buffered(), "\"id\":2") != null);
|
||||
try testing.expect(std.mem.indexOf(u8, out_alloc.writer.buffered(), "\"code\":-32601") != null);
|
||||
out_alloc.writer.end = 0;
|
||||
|
||||
// 3. Parse error
|
||||
{
|
||||
@@ -142,9 +130,8 @@ test "handleMessage - synchronous unit tests" {
|
||||
log.opts.filter_scopes = &.{.mcp};
|
||||
defer log.opts.filter_scopes = old_filter;
|
||||
|
||||
try handleMessage(harness.server, aa, "invalid json");
|
||||
const resp3 = try harness.readResponse(aa);
|
||||
try testing.expect(std.mem.indexOf(u8, resp3, "\"id\":null") != null);
|
||||
try testing.expect(std.mem.indexOf(u8, resp3, "\"code\":-32700") != null);
|
||||
try handleMessage(server, aa, "invalid json");
|
||||
try testing.expect(std.mem.indexOf(u8, out_alloc.writer.buffered(), "\"id\":null") != null);
|
||||
try testing.expect(std.mem.indexOf(u8, out_alloc.writer.buffered(), "\"code\":-32700") != null);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,130 +0,0 @@
|
||||
const std = @import("std");
|
||||
const lp = @import("lightpanda");
|
||||
const App = @import("../App.zig");
|
||||
const Server = @import("Server.zig");
|
||||
const router = @import("router.zig");
|
||||
|
||||
pub const McpHarness = struct {
|
||||
allocator: std.mem.Allocator,
|
||||
app: *App,
|
||||
server: *Server,
|
||||
|
||||
// Client view of the communication
|
||||
client_in: std.fs.File, // Client reads from this (server's stdout)
|
||||
client_out: std.fs.File, // Client writes to this (server's stdin)
|
||||
|
||||
// Server view of the communication
|
||||
server_in: std.fs.File, // Server reads from this (client's stdout)
|
||||
server_out: std.fs.File, // Server writes to this (client's stdin)
|
||||
|
||||
thread: ?std.Thread = null,
|
||||
test_error: ?anyerror = null,
|
||||
|
||||
const Pipe = struct {
|
||||
read: std.fs.File,
|
||||
write: std.fs.File,
|
||||
|
||||
fn init() !Pipe {
|
||||
const fds = try std.posix.pipe();
|
||||
return .{
|
||||
.read = .{ .handle = fds[0] },
|
||||
.write = .{ .handle = fds[1] },
|
||||
};
|
||||
}
|
||||
|
||||
fn close(self: Pipe) void {
|
||||
self.read.close();
|
||||
self.write.close();
|
||||
}
|
||||
};
|
||||
|
||||
pub fn init(allocator: std.mem.Allocator, app: *App) !*McpHarness {
|
||||
const self = try allocator.create(McpHarness);
|
||||
errdefer allocator.destroy(self);
|
||||
|
||||
self.allocator = allocator;
|
||||
self.app = app;
|
||||
self.thread = null;
|
||||
self.test_error = null;
|
||||
|
||||
const stdin_pipe = try Pipe.init();
|
||||
errdefer stdin_pipe.close();
|
||||
|
||||
const stdout_pipe = try Pipe.init();
|
||||
errdefer {
|
||||
stdin_pipe.close();
|
||||
stdout_pipe.close();
|
||||
}
|
||||
|
||||
self.server_in = stdin_pipe.read;
|
||||
self.client_out = stdin_pipe.write;
|
||||
self.client_in = stdout_pipe.read;
|
||||
self.server_out = stdout_pipe.write;
|
||||
|
||||
self.server = try Server.init(allocator, app, self.server_out);
|
||||
errdefer self.server.deinit();
|
||||
|
||||
return self;
|
||||
}
|
||||
|
||||
pub fn deinit(self: *McpHarness) void {
|
||||
self.server.is_running.store(false, .release);
|
||||
|
||||
// Wake up the server's poll loop by writing a newline
|
||||
self.client_out.writeAll("\n") catch {};
|
||||
|
||||
// Closing the client's output will also send EOF to the server
|
||||
self.client_out.close();
|
||||
|
||||
if (self.thread) |t| t.join();
|
||||
|
||||
self.server.deinit();
|
||||
|
||||
// Server handles are closed here if they weren't already
|
||||
self.server_in.close();
|
||||
self.server_out.close();
|
||||
self.client_in.close();
|
||||
// self.client_out is already closed above
|
||||
|
||||
self.allocator.destroy(self);
|
||||
}
|
||||
|
||||
pub fn runServer(self: *McpHarness) !void {
|
||||
try router.processRequests(self.server, self.server_in);
|
||||
if (self.test_error) |err| return err;
|
||||
}
|
||||
|
||||
pub fn sendRequest(self: *McpHarness, request_json: []const u8) !void {
|
||||
try self.client_out.writeAll(request_json);
|
||||
if (request_json.len > 0 and request_json[request_json.len - 1] != '\n') {
|
||||
try self.client_out.writeAll("\n");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn readResponse(self: *McpHarness, arena: std.mem.Allocator) ![]const u8 {
|
||||
const Streams = enum { stdout };
|
||||
var poller = std.io.poll(self.allocator, Streams, .{ .stdout = self.client_in });
|
||||
defer poller.deinit();
|
||||
|
||||
const r = poller.reader(.stdout);
|
||||
|
||||
const timeout_ns = 2 * std.time.ns_per_s;
|
||||
var timer = try std.time.Timer.start();
|
||||
|
||||
while (timer.read() < timeout_ns) {
|
||||
const poll_result = try poller.pollTimeout(timeout_ns - timer.read());
|
||||
|
||||
if (!poll_result) return error.EndOfStream;
|
||||
|
||||
const buffered = r.buffered();
|
||||
if (std.mem.indexOfScalar(u8, buffered, '\n')) |newline_idx| {
|
||||
const line = buffered[0 .. newline_idx + 1];
|
||||
const result = try arena.dupe(u8, std.mem.trim(u8, line, " \r\n\t"));
|
||||
r.toss(line.len);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
return error.Timeout;
|
||||
}
|
||||
};
|
||||
Reference in New Issue
Block a user